You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/02/25 16:01:57 UTC

[cassandra] branch cassandra-3.0 updated (b27cc37 -> 127cf8f)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from b27cc37  Merge branch 'cassandra-2.2' into cassandra-3.0
     new e0c10fd  Implement simple In-JVM tooling for testing distributed paging
     new 127cf8f  Merge branch 'cassandra-2.2' into cassandra-3.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/cassandra/cql3/UntypedResultSet.java    |  82 ++++++++++++++
 .../cassandra/distributed/api/ICoordinator.java    |   4 +
 .../cassandra/distributed/impl/Coordinator.java    |  78 +++++++++++--
 .../apache/cassandra/distributed/impl/RowUtil.java |  26 +++++
 .../test/DistributedReadWritePathTest.java         | 122 ++++++++++++++++++++-
 .../distributed/test/DistributedTestBase.java      |  40 ++++++-
 6 files changed, 341 insertions(+), 11 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 127cf8ffbff326f0b714f216208649ac6117dbb1
Merge: b27cc37 e0c10fd
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Feb 25 16:29:14 2019 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 .../apache/cassandra/cql3/UntypedResultSet.java    |  82 ++++++++++++++
 .../cassandra/distributed/api/ICoordinator.java    |   4 +
 .../cassandra/distributed/impl/Coordinator.java    |  78 +++++++++++--
 .../apache/cassandra/distributed/impl/RowUtil.java |  26 +++++
 .../test/DistributedReadWritePathTest.java         | 122 ++++++++++++++++++++-
 .../distributed/test/DistributedTestBase.java      |  40 ++++++-
 6 files changed, 341 insertions(+), 11 deletions(-)

diff --cc src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index ada1e0f,e8d610d..dc4237d
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@@ -22,18 -22,11 +22,21 @@@ import java.net.InetAddress
  import java.nio.ByteBuffer;
  import java.util.*;
  
 -import com.google.common.collect.AbstractIterator;
++import com.google.common.annotations.VisibleForTesting;
+ 
++import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.utils.AbstractIterator;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.cql3.statements.SelectStatement;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
  import org.apache.cassandra.db.marshal.*;
  import org.apache.cassandra.service.pager.QueryPager;
 +import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.utils.FBUtilities;
  
  /** a utility for doing internal cql-based queries */
  public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
@@@ -53,6 -46,6 +56,20 @@@
          return new FromPager(select, pager, pageSize);
      }
  
++    /**
++     * This method is intended for testing purposes, since it executes query on cluster
++     * and not on the local node only.
++     */
++    @VisibleForTesting
++    public static UntypedResultSet create(SelectStatement select,
++                                          ConsistencyLevel cl,
++                                          ClientState clientState,
++                                          QueryPager pager,
++                                          int pageSize)
++    {
++        return new FromDistributedPager(select, cl, clientState, pager, pageSize);
++    }
++
      public boolean isEmpty()
      {
          return size() == 0;
@@@ -106,6 -99,6 +123,71 @@@
          }
      }
  
++    /**
++     * Pager that calls `execute` rather than `executeInternal`
++     */
++    private static class FromDistributedPager extends UntypedResultSet
++    {
++        private final SelectStatement select;
++        private final ConsistencyLevel cl;
++        private final ClientState clientState;
++        private final QueryPager pager;
++        private final int pageSize;
++        private final List<ColumnSpecification> metadata;
++
++        private FromDistributedPager(SelectStatement select,
++                                     ConsistencyLevel cl,
++                                     ClientState clientState,
++                                     QueryPager pager, int pageSize)
++        {
++            this.select = select;
++            this.cl = cl;
++            this.clientState = clientState;
++            this.pager = pager;
++            this.pageSize = pageSize;
++            this.metadata = select.getResultMetadata().requestNames();
++        }
++
++        public int size()
++        {
++            throw new UnsupportedOperationException();
++        }
++
++        public Row one()
++        {
++            throw new UnsupportedOperationException();
++        }
++
++        public Iterator<Row> iterator()
++        {
++            return new AbstractIterator<Row>()
++            {
++                private Iterator<List<ByteBuffer>> currentPage;
++
++                protected Row computeNext()
++                {
++                    int nowInSec = FBUtilities.nowInSeconds();
++                    while (currentPage == null || !currentPage.hasNext())
++                    {
++                        if (pager.isExhausted())
++                            return endOfData();
++
++                        try (PartitionIterator iter = pager.fetchPage(pageSize, cl, clientState))
++                        {
++                            currentPage = select.process(iter, nowInSec).rows.iterator();
++                        }
++                    }
++                    return new Row(metadata, currentPage.next());
++                }
++            };
++        }
++
++        public List<ColumnSpecification> metadata()
++        {
++            return metadata;
++        }
++    }
++
      private static class FromResultList extends UntypedResultSet
      {
          private final List<Map<String, ByteBuffer>> cqlRows;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 655ceb8,96b0dd1..c27523f
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -18,8 -18,9 +18,10 @@@
  
  package org.apache.cassandra.distributed.impl;
  
++import java.net.InetSocketAddress;
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
+ import java.util.Iterator;
  import java.util.List;
  
  import org.apache.cassandra.cql3.CQLStatement;
@@@ -29,9 -32,12 +33,11 @@@ import org.apache.cassandra.db.Consiste
  import org.apache.cassandra.distributed.api.ICoordinator;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
 -import org.apache.cassandra.service.pager.Pageable;
+ import org.apache.cassandra.service.pager.QueryPager;
 -import org.apache.cassandra.service.pager.QueryPagers;
  import org.apache.cassandra.transport.Server;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.FBUtilities;
  
  public class Coordinator implements ICoordinator
  {
@@@ -46,21 -52,22 +52,22 @@@
      {
          return instance.sync(() -> {
              ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
--            CQLStatement prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement;
++            ClientState clientState = makeFakeClientState();
++            CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
              List<ByteBuffer> boundBBValues = new ArrayList<>();
              for (Object boundValue : boundValues)
              {
                  boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
              }
  
 -            prepared.validate(QueryState.forInternalCalls().getClientState());
              ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
-                     QueryOptions.create(consistencyLevel,
-                             boundBBValues,
-                             false,
-                             10,
-                             null,
-                             null,
-                             Server.CURRENT_VERSION));
+                                                  QueryOptions.create(consistencyLevel,
+                                                                      boundBBValues,
+                                                                      false,
+                                                                      Integer.MAX_VALUE,
+                                                                      null,
+                                                                      null,
+                                                                      Server.CURRENT_VERSION));
  
              if (res != null && res.kind == ResultMessage.Kind.ROWS)
              {
@@@ -72,4 -79,56 +79,59 @@@
              }
          }).call();
      }
+ 
+     @Override
+     public Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevelOrigin, int pageSize, Object... boundValues)
+     {
+         if (pageSize <= 0)
+             throw new IllegalArgumentException("Page size should be strictly positive but was " + pageSize);
+ 
+         return instance.sync(() -> {
+             ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
 -            CQLStatement prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement;
++            ClientState clientState = makeFakeClientState();
++            CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
+             List<ByteBuffer> boundBBValues = new ArrayList<>();
+             for (Object boundValue : boundValues)
+             {
+                 boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
+             }
+ 
+             prepared.validate(QueryState.forInternalCalls().getClientState());
+             assert prepared instanceof SelectStatement : "Only SELECT statements can be executed with paging";
+ 
 -            ClientState clientState = QueryState.forInternalCalls().getClientState();
+             SelectStatement selectStatement = (SelectStatement) prepared;
 -            QueryOptions queryOptions = QueryOptions.create(consistencyLevel,
 -                                                            boundBBValues,
 -                                                            false,
 -                                                            pageSize,
 -                                                            null,
 -                                                            null,
 -                                                            Server.CURRENT_VERSION);
 -            Pageable pageable = selectStatement.getPageableCommand(queryOptions);
++
++            QueryPager pager = selectStatement.getQuery(QueryOptions.create(consistencyLevel,
++                                                                            boundBBValues,
++                                                                            false,
++                                                                            pageSize,
++                                                                            null,
++                                                                            null,
++                                                                            Server.CURRENT_VERSION),
++                                                        FBUtilities.nowInSeconds())
++                                     .getPager(null, Server.CURRENT_VERSION);
+ 
+             // Usually pager fetches a single page (see SelectStatement#execute). We need to iterate over all
+             // of the results lazily.
 -            QueryPager pager = QueryPagers.pager(pageable, consistencyLevel, clientState, null);
 -            Iterator<Object[]> iter = RowUtil.toObjects(selectStatement.getResultMetadata().names,
 -                                                        UntypedResultSet.create(selectStatement,
 -                                                                                pager,
 -                                                                                pageSize).iterator());
 -
 -            // We have to make sure iterator is not running on main thread.
+             return new Iterator<Object[]>() {
++                Iterator<Object[]> iter = RowUtil.toObjects(UntypedResultSet.create(selectStatement, consistencyLevel, clientState, pager,  pageSize));
++
+                 public boolean hasNext()
+                 {
++                    // We have to make sure iterator is not running on main thread.
+                     return instance.sync(() -> iter.hasNext()).call();
+                 }
+ 
+                 public Object[] next()
+                 {
+                     return instance.sync(() -> iter.next()).call();
+                 }
+             };
+         }).call();
+     }
++
++    private static final ClientState makeFakeClientState()
++    {
++        return ClientState.forExternalCalls(new InetSocketAddress(FBUtilities.getLocalAddress(), 9042));
++    }
  }
diff --cc test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
index c3b129b,d84c4a9..e83b85e
--- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
@@@ -44,4 -48,20 +48,26 @@@ public class RowUti
          }
          return result;
      }
+ 
++    public static Iterator<Object[]> toObjects(UntypedResultSet rs)
++    {
++        return toObjects(rs.metadata(), rs.iterator());
++    }
++
+     public static Iterator<Object[]> toObjects(List<ColumnSpecification> columnSpecs, Iterator<UntypedResultSet.Row> rs)
+     {
+         return Iterators.transform(rs,
+                                    (row) -> {
+                                        Object[] objectRow = new Object[columnSpecs.size()];
+                                        for (int i = 0; i < columnSpecs.size(); i++)
+                                        {
+                                            ColumnSpecification columnSpec = columnSpecs.get(i);
+                                            ByteBuffer bb = row.getBytes(columnSpec.name.toString());
++
+                                            if (bb != null)
+                                                objectRow[i] = columnSpec.type.getSerializer().deserialize(bb);
+                                        }
+                                        return objectRow;
+                                    });
+     }
  }
diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
index 75131b3,547e41c..3578b2c
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
@@@ -91,62 -90,121 +91,180 @@@ public class DistributedReadWritePathTe
      }
  
      @Test
 +    public void writeWithSchemaDisagreement() throws Throwable
 +    {
 +        try (Cluster cluster = init(Cluster.create(3)))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
 +
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +
 +            // Introduce schema disagreement
 +            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 +
 +            Exception thrown = null;
 +            try
 +            {
 +                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
 +                                              ConsistencyLevel.QUORUM);
 +            }
 +            catch (RuntimeException e)
 +            {
 +                thrown = e;
 +            }
 +
 +            Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
 +            Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
 +        }
 +    }
 +
 +    @Test
 +    public void readWithSchemaDisagreement() throws Throwable
 +    {
 +        try (Cluster cluster = init(Cluster.create(3)))
 +        {
 +            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
 +
 +            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
 +
 +            // Introduce schema disagreement
 +            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 +
 +            Exception thrown = null;
 +            try
 +            {
 +                assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
 +                                                         ConsistencyLevel.ALL),
 +                           row(1, 1, 1, null));
 +            }
 +            catch (Exception e)
 +            {
 +                thrown = e;
 +            }
 +            Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
 +            Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
 +        }
 +    }
++    @Test
+     public void simplePagedReadsTest() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             int size = 100;
+             Object[][] results = new Object[size][];
+             for (int i = 0; i < size; i++)
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                ConsistencyLevel.QUORUM,
+                                                i, i);
+                 results[i] = new Object[] { 1, i, i};
+             }
+ 
+             // First, make sure that non-paged reads are able to fetch the results
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.QUORUM),
+                        results);
+ 
+             // Make sure paged read returns same results with different page sizes
+             for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+             {
+                 assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                     ConsistencyLevel.QUORUM,
+                                                                     pageSize),
+                            results);
+             }
+         }
+     }
+ 
+     @Test
+     public void pagingWithRepairTest() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
 -            int size = 10;
++            int size = 100;
+             Object[][] results = new Object[size][];
+             for (int i = 0; i < size; i++)
+             {
+                 // Make sure that data lands on different nodes and not coordinator
+                 cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                                 i, i);
+ 
+                 results[i] = new Object[] { 1, i, i};
+             }
+ 
+             // Make sure paged read returns same results with different page sizes
+             for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+             {
+                 assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+                                                                     ConsistencyLevel.ALL,
+                                                                     pageSize),
+                            results);
+             }
+ 
+             assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
+                        results);
+         }
+     }
+ 
+     @Test
+     public void pagingTests() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3));
+              Cluster singleNode = init(Cluster.create(1)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+             singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             for (int i = 0; i < 10; i++)
+             {
+                 for (int j = 0; j < 10; j++)
+                 {
+                     cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                    ConsistencyLevel.QUORUM,
+                                                    i, j, i + i);
+                     singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+                                                       ConsistencyLevel.QUORUM,
+                                                       i, j, i + i);
+                 }
+             }
+ 
+             int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
+             String[] statements = new String [] {"SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
+                                                  "SELECT * FROM " + KEYSPACE  + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
+                                                  "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl LIMIT 3",
+                                                  "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10)",
+                                                  "SELECT DISTINCT pk FROM " + KEYSPACE  + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
+             };
+             for (String statement : statements)
+             {
+                 for (int pageSize : pageSizes)
+                 {
+                     assertRows(cluster.coordinator(1)
+                                       .executeWithPaging(statement,
+                                                          ConsistencyLevel.QUORUM,  pageSize),
+                                singleNode.coordinator(1)
+                                          .executeWithPaging(statement,
+                                                             ConsistencyLevel.QUORUM,  Integer.MAX_VALUE));
+                 }
+             }
+ 
+         }
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org