You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/09/19 17:07:20 UTC
[2/3] cassandra git commit: Allow MV's SELECT to restrict PK columns
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index 3abffd5..d1f5272 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.service.pager.PagingState;
*/
public interface ReadQuery
{
- public static final ReadQuery EMPTY = new ReadQuery()
+ ReadQuery EMPTY = new ReadQuery()
{
public ReadOrderGroup startOrderGroup()
{
@@ -67,6 +67,16 @@ public interface ReadQuery
{
return QueryPager.EMPTY;
}
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return false;
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return false;
+ }
};
/**
@@ -116,4 +126,16 @@ public interface ReadQuery
* @return The limits for the query.
*/
public DataLimits limits();
+
+ /**
+ * @return true if the read query would select the given key, including checks against the row filter, if
+ * checkRowFilter is true
+ */
+ public boolean selectsKey(DecoratedKey key);
+
+ /**
+ * @return true if the read query would select the given clustering, including checks against the row filter, if
+ * checkRowFilter is true
+ */
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 49cf07c..a8e37b4 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.collect.Iterables;
import org.apache.cassandra.cache.IRowCacheEntry;
import org.apache.cassandra.cache.RowCacheKey;
import org.apache.cassandra.cache.RowCacheSentinel;
@@ -190,15 +191,23 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
return DatabaseDescriptor.getReadRpcTimeout();
}
- public boolean selects(DecoratedKey partitionKey, Clustering clustering)
+ public boolean selectsKey(DecoratedKey key)
{
- if (!partitionKey().equals(partitionKey))
+ if (!this.partitionKey().equals(key))
return false;
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
if (clustering == Clustering.STATIC_CLUSTERING)
return !columnFilter().fetchedColumns().statics.isEmpty();
- return clusteringIndexFilter().selects(clustering);
+ if (!clusteringIndexFilter().selects(clustering))
+ return false;
+
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
}
/**
@@ -503,6 +512,16 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
return new MultiPartitionPager(this, pagingState, protocolVersion);
}
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return Iterables.any(commands, c -> c.selectsKey(key));
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index b5968d5..0ff30af 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -115,6 +115,45 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec);
/**
+ * Returns true if all of the expressions within this filter that apply to the partition key are satisfied by
+ * the given key, false otherwise.
+ */
+ public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, AbstractType<?> keyValidator)
+ {
+ for (Expression e : expressions)
+ {
+ if (!e.column.isPartitionKey())
+ continue;
+
+ ByteBuffer value = keyValidator instanceof CompositeType
+ ? ((CompositeType) keyValidator).split(key.getKey())[e.column.position()]
+ : key.getKey();
+ if (!e.operator().isSatisfiedBy(e.column.type, value, e.value))
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns true if all of the expressions within this filter that apply to the clustering key are satisfied by
+ * the given Clustering, false otherwise.
+ */
+ public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering clustering)
+ {
+ for (Expression e : expressions)
+ {
+ if (!e.column.isClusteringColumn())
+ continue;
+
+ if (!e.operator().isSatisfiedBy(e.column.type, clustering.get(e.column.position()), e.value))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Returns this filter but without the provided expression. This method
* *assumes* that the filter contains the provided expression.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 6eb9071..46dc3fa 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.CFMetaData;
@@ -94,6 +95,18 @@ public class TemporalRow
this.isNew = isNew;
}
+ @Override
+ public String toString()
+ {
+ return MoreObjects.toStringHelper(this)
+ .add("value", value == null ? "null" : ByteBufferUtil.bytesToHex(value))
+ .add("timestamp", timestamp)
+ .add("ttl", ttl)
+ .add("localDeletionTime", localDeletionTime)
+ .add("isNew", isNew)
+ .toString();
+ }
+
public TemporalCell reconcile(TemporalCell that)
{
int now = FBUtilities.nowInSeconds();
@@ -208,13 +221,13 @@ public class TemporalRow
if (cell.isNew)
{
- assert newCell == null || newCell.equals(cell) : "Only one cell version can be marked New";
+ assert newCell == null || newCell.equals(cell) : "Only one cell version can be marked New; newCell: " + newCell + ", cell: " + cell;
newCell = cell;
numSet = existingCell == null ? 1 : 2;
}
else
{
- assert existingCell == null || existingCell.equals(cell) : "Only one cell version can be marked Existing";
+ assert existingCell == null || existingCell.equals(cell) : "Only one cell version can be marked Existing; existingCell: " + existingCell + ", cell: " + cell;
existingCell = cell;
numSet = newCell == null ? 1 : 2;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 28ec489..0a7f747 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -18,53 +18,30 @@
package org.apache.cassandra.db.view;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.ViewDefinition;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.AbstractReadCommandBuilder;
import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.partitions.AbstractBTreePartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeRow;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.ColumnData;
-import org.apache.cassandra.db.rows.ComplexColumnData;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.FBUtilities;
/**
* A View copies data from a base table into a view table which can be queried independently from the
@@ -111,6 +88,13 @@ public class View
private final boolean includeAllColumns;
private ViewBuilder builder;
+ // Only the raw statement can be final, because the statement cannot always be prepared when the MV is initialized.
+ // For example, during startup, this view will be initialized as part of the Keyspace.open() work; preparing a statement
+ // also requires the keyspace to be open, so this results in double-initialization problems.
+ private final SelectStatement.RawStatement rawSelect;
+ private SelectStatement select;
+ private ReadQuery query;
+
public View(ViewDefinition definition,
ColumnFamilyStore baseCfs)
{
@@ -120,6 +104,7 @@ public class View
includeAllColumns = definition.includeAllColumns;
viewHasAllPrimaryKeys = updateDefinition(definition);
+ this.rawSelect = definition.select;
}
public ViewDefinition getDefinition()
@@ -205,9 +190,9 @@ public class View
*/
public boolean updateAffectsView(AbstractBTreePartition partition)
{
- // If we are including all of the columns, then any update will be included
- if (includeAllColumns)
- return true;
+ ReadQuery selectQuery = getReadQuery();
+ if (!selectQuery.selectsKey(partition.partitionKey()))
+ return false;
// If there are range tombstones, tombstones will also need to be generated for the view
// This requires a query of the base rows and generating tombstones for all of those values
@@ -217,7 +202,10 @@ public class View
// Check each row for deletion or update
for (Row row : partition)
{
- if (!row.deletion().isLive())
+ if (!selectQuery.selectsClustering(partition.partitionKey(), row.clustering()))
+ continue;
+
+ if (includeAllColumns || viewHasAllPrimaryKeys || !row.deletion().isLive())
return true;
if (row.primaryKeyLivenessInfo().isLive(FBUtilities.nowInSeconds()))
@@ -440,7 +428,7 @@ public class View
if (!deletionInfo.getPartitionDeletion().isLive())
{
- command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
+ command = getSelectStatement().internalReadForView(dk, rowSet.nowInSec);
}
else
{
@@ -459,11 +447,15 @@ public class View
if (command == null)
{
+ ReadQuery selectQuery = getReadQuery();
SinglePartitionSliceBuilder builder = null;
for (Row row : partition)
{
if (!row.deletion().isLive())
{
+ if (!selectQuery.selectsClustering(rowSet.dk, row.clustering()))
+ continue;
+
if (builder == null)
builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
builder.addSlice(Slice.make(row.clustering()));
@@ -476,10 +468,10 @@ public class View
if (command != null)
{
+ ReadQuery selectQuery = getReadQuery();
+ assert selectQuery.selectsKey(rowSet.dk);
- //We may have already done this work for
- //another MV update so check
-
+ // We may have already done this work for another MV update so check
if (!rowSet.hasTombstonedExisting())
{
QueryPager pager = command.getPager(null, Server.CURRENT_VERSION);
@@ -498,7 +490,8 @@ public class View
while (rowIterator.hasNext())
{
Row row = rowIterator.next();
- rowSet.addRow(row, false);
+ if (selectQuery.selectsClustering(rowSet.dk, row.clustering()))
+ rowSet.addRow(row, false);
}
}
}
@@ -609,6 +602,34 @@ public class View
return rowSet;
}
+ /**
+ * Returns the SelectStatement used to populate and filter this view. Internal users should access the select
+ * statement this way to ensure it has been prepared.
+ */
+ public SelectStatement getSelectStatement()
+ {
+ if (select == null)
+ {
+ ClientState state = ClientState.forInternalCalls();
+ state.setKeyspace(baseCfs.keyspace.getName());
+ rawSelect.prepareKeyspace(state);
+ ParsedStatement.Prepared prepared = rawSelect.prepare(true);
+ select = (SelectStatement) prepared.statement;
+ }
+
+ return select;
+ }
+
+ /**
+ * Returns the ReadQuery used to filter this view. Internal users should access the query this way to ensure it
+ * has been prepared.
+ */
+ public ReadQuery getReadQuery()
+ {
+ if (query == null)
+ query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds());
+ return query;
+ }
/**
* @param isBuilding If the view is currently being built, we do not query the values which are already stored,
@@ -683,4 +704,55 @@ public class View
final UUID baseId = Schema.instance.getId(keyspace, baseTable);
return Iterables.filter(ksm.views, view -> view.baseTableId.equals(baseId));
}
+
+ /**
+ * Builds the string text for a materialized view's SELECT statement.
+ */
+ public static String buildSelectStatement(String cfName, Collection<ColumnDefinition> includedColumns, String whereClause)
+ {
+ StringBuilder rawSelect = new StringBuilder("SELECT ");
+ if (includedColumns == null || includedColumns.isEmpty())
+ rawSelect.append("*");
+ else
+ rawSelect.append(includedColumns.stream().map(id -> id.name.toCQLString()).collect(Collectors.joining(", ")));
+ rawSelect.append(" FROM \"").append(cfName).append("\" WHERE ") .append(whereClause).append(" ALLOW FILTERING");
+ return rawSelect.toString();
+ }
+
+ public static String relationsToWhereClause(List<Relation> whereClause)
+ {
+ List<String> expressions = new ArrayList<>(whereClause.size());
+ for (Relation rel : whereClause)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ if (rel.isMultiColumn())
+ {
+ sb.append(((MultiColumnRelation) rel).getEntities().stream()
+ .map(ColumnIdentifier.Raw::toCQLString)
+ .collect(Collectors.joining(", ", "(", ")")));
+ }
+ else
+ {
+ sb.append(((SingleColumnRelation) rel).getEntity().toCQLString());
+ }
+
+ sb.append(" ").append(rel.operator()).append(" ");
+
+ if (rel.isIN())
+ {
+ sb.append(rel.getInValues().stream()
+ .map(Term.Raw::getText)
+ .collect(Collectors.joining(", ", "(", ")")));
+ }
+ else
+ {
+ sb.append(rel.getValue().getText());
+ }
+
+ expressions.add(sb.toString());
+ }
+
+ return expressions.stream().collect(Collectors.joining(" AND "));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index f0b01c7..0a0fe08 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -29,12 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
@@ -44,7 +39,6 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.StorageProxy;
@@ -52,7 +46,6 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -77,7 +70,11 @@ public class ViewBuilder extends CompactionInfo.Holder
private void buildKey(DecoratedKey key)
{
- QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null, Server.CURRENT_VERSION);
+ ReadQuery selectQuery = view.getReadQuery();
+ if (!selectQuery.selectsKey(key))
+ return;
+
+ QueryPager pager = view.getSelectStatement().internalReadForView(key, FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION);
while (!pager.isExhausted())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index 77867fc..f1751f5 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -51,7 +51,7 @@ public class CompositesSearcher extends CassandraIndexSearcher
private boolean isMatchingEntry(DecoratedKey partitionKey, IndexEntry entry, ReadCommand command)
{
- return command.selects(partitionKey, entry.indexedEntryClustering);
+ return command.selectsKey(partitionKey) && command.selectsClustering(partitionKey, entry.indexedEntryClustering);
}
protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index fb97ca5..5f27d82 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -35,14 +35,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
import org.apache.cassandra.config.ColumnDefinition.ClusteringOrder;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.view.View;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -75,6 +75,7 @@ public final class SchemaKeyspace
public static final String AGGREGATES = "aggregates";
public static final String INDEXES = "indexes";
+
public static final List<String> ALL =
ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
@@ -155,6 +156,7 @@ public final class SchemaKeyspace
+ "view_name text,"
+ "base_table_id uuid,"
+ "base_table_name text,"
+ + "where_clause text,"
+ "bloom_filter_fp_chance double,"
+ "caching frozen<map<text, text>>,"
+ "comment text,"
@@ -1311,6 +1313,7 @@ public final class SchemaKeyspace
builder.add("include_all_columns", view.includeAllColumns)
.add("base_table_id", view.baseTableId)
.add("base_table_name", view.baseTableMetadata().cfName)
+ .add("where_clause", view.whereClause)
.add("id", table.cfId);
addTableParamsToSchemaMutation(table.params, builder);
@@ -1426,7 +1429,9 @@ public final class SchemaKeyspace
String view = row.getString("view_name");
UUID id = row.getUUID("id");
UUID baseTableId = row.getUUID("base_table_id");
+ String baseTableName = row.getString("base_table_name");
boolean includeAll = row.getBoolean("include_all_columns");
+ String whereClause = row.getString("where_clause");
List<ColumnDefinition> columns =
readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition);
@@ -1447,7 +1452,10 @@ public final class SchemaKeyspace
.params(createTableParamsFromRow(row))
.droppedColumns(droppedColumns);
- return new ViewDefinition(keyspace, view, baseTableId, includeAll, cfm);
+ String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
+ SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+
+ return new ViewDefinition(keyspace, view, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm);
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a4253b6/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 61e4fc2..e92563b 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import com.datastax.driver.core.*;
import com.datastax.driver.core.ResultSet;
@@ -790,6 +791,83 @@ public abstract class CQLTester
Assert.assertTrue(String.format("Got %s rows than expected. Expected %d but got %d", rows.length>i ? "less" : "more", rows.length, i), i == rows.length);
}
+ /**
+ * Like assertRows(), but ignores the ordering of rows.
+ */
+ public static void assertRowsIgnoringOrder(UntypedResultSet result, Object[]... rows)
+ {
+ if (result == null)
+ {
+ if (rows.length > 0)
+ Assert.fail(String.format("No rows returned by query but %d expected", rows.length));
+ return;
+ }
+
+ List<ColumnSpecification> meta = result.metadata();
+
+ Set<List<ByteBuffer>> expectedRows = new HashSet<>(rows.length);
+ for (Object[] expected : rows)
+ {
+ Assert.assertEquals("Invalid number of (expected) values provided for row", expected.length, meta.size());
+ List<ByteBuffer> expectedRow = new ArrayList<>(meta.size());
+ for (int j = 0; j < meta.size(); j++)
+ expectedRow.add(makeByteBuffer(expected[j], meta.get(j).type));
+ expectedRows.add(expectedRow);
+ }
+
+ Set<List<ByteBuffer>> actualRows = new HashSet<>(result.size());
+ for (UntypedResultSet.Row actual : result)
+ {
+ List<ByteBuffer> actualRow = new ArrayList<>(meta.size());
+ for (int j = 0; j < meta.size(); j++)
+ actualRow.add(actual.getBytes(meta.get(j).name.toString()));
+ actualRows.add(actualRow);
+ }
+
+ com.google.common.collect.Sets.SetView<List<ByteBuffer>> extra = com.google.common.collect.Sets.difference(actualRows, expectedRows);
+ com.google.common.collect.Sets.SetView<List<ByteBuffer>> missing = com.google.common.collect.Sets.difference(expectedRows, actualRows);
+ if (!extra.isEmpty() || !missing.isEmpty())
+ {
+ List<String> extraRows = makeRowStrings(extra, meta);
+ List<String> missingRows = makeRowStrings(missing, meta);
+ StringBuilder sb = new StringBuilder();
+ if (!extra.isEmpty())
+ {
+ sb.append("Got ").append(extra.size()).append(" extra row(s) ");
+ if (!missing.isEmpty())
+ sb.append("and ").append(missing.size()).append(" missing row(s) ");
+ sb.append("in result. Extra rows:\n ");
+ sb.append(extraRows.stream().collect(Collectors.joining("\n ")));
+ if (!missing.isEmpty())
+ sb.append("\nMissing Rows:\n ").append(missingRows.stream().collect(Collectors.joining("\n ")));
+ Assert.fail(sb.toString());
+ }
+
+ if (!missing.isEmpty())
+ Assert.fail("Missing " + missing.size() + " row(s) in result: \n " + missingRows.stream().collect(Collectors.joining("\n ")));
+ }
+
+ assert expectedRows.size() == actualRows.size();
+ }
+
+ private static List<String> makeRowStrings(Iterable<List<ByteBuffer>> rows, List<ColumnSpecification> meta)
+ {
+ List<String> strings = new ArrayList<>();
+ for (List<ByteBuffer> row : rows)
+ {
+ StringBuilder sb = new StringBuilder("row(");
+ for (int j = 0; j < row.size(); j++)
+ {
+ ColumnSpecification column = meta.get(j);
+ sb.append(column.name.toString()).append("=").append(formatValue(row.get(j), column.type));
+ if (j < (row.size() - 1))
+ sb.append(", ");
+ }
+ strings.add(sb.append(")").toString());
+ }
+ return strings;
+ }
+
protected void assertRowCount(UntypedResultSet result, int numExpectedRows)
{
if (result == null)