You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/02/25 16:01:59 UTC

[cassandra] branch cassandra-3.11 updated (7b462ec -> 093fe7d)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 7b462ec  Merge branch 'cassandra-3.0' into cassandra-3.11
     new e0c10fd  Implement simple In-JVM tooling for testing distributed paging
     new 127cf8f  Merge branch 'cassandra-2.2' into cassandra-3.0
     new 093fe7d  Merge branch 'cassandra-3.0' into cassandra-3.11

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/cassandra/cql3/UntypedResultSet.java    |  82 ++++++++++++++
 .../cassandra/distributed/api/ICoordinator.java    |   4 +
 .../cassandra/distributed/impl/Coordinator.java    |  66 ++++++++++-
 .../apache/cassandra/distributed/impl/RowUtil.java |  26 +++++
 .../test/DistributedReadWritePathTest.java         | 122 ++++++++++++++++++++-
 .../distributed/test/DistributedTestBase.java      |  40 ++++++-
 6 files changed, 335 insertions(+), 5 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 093fe7dfe92db4e756f047920d3e73d0bbd9dff4
Merge: 7b462ec 127cf8f
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Feb 25 16:30:36 2019 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 .../apache/cassandra/cql3/UntypedResultSet.java    |  82 ++++++++++++++
 .../cassandra/distributed/api/ICoordinator.java    |   4 +
 .../cassandra/distributed/impl/Coordinator.java    |  66 ++++++++++-
 .../apache/cassandra/distributed/impl/RowUtil.java |  26 +++++
 .../test/DistributedReadWritePathTest.java         | 122 ++++++++++++++++++++-
 .../distributed/test/DistributedTestBase.java      |  40 ++++++-
 6 files changed, 335 insertions(+), 5 deletions(-)

diff --cc src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index d896071,dc4237d..c551d42
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@@ -22,7 -22,9 +22,10 @@@ import java.net.InetAddress
  import java.nio.ByteBuffer;
  import java.util.*;
  
+ import com.google.common.annotations.VisibleForTesting;
+ 
+ import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.AbstractIterator;
  
  import org.apache.cassandra.config.CFMetaData;
@@@ -106,6 -123,71 +123,71 @@@ public abstract class UntypedResultSet 
          }
      }
  
+     /**
+      * Pager that calls `execute` rather than `executeInternal`
+      */
+     private static class FromDistributedPager extends UntypedResultSet
+     {
+         private final SelectStatement select;
+         private final ConsistencyLevel cl;
+         private final ClientState clientState;
+         private final QueryPager pager;
+         private final int pageSize;
+         private final List<ColumnSpecification> metadata;
+ 
+         private FromDistributedPager(SelectStatement select,
+                                      ConsistencyLevel cl,
+                                      ClientState clientState,
+                                      QueryPager pager, int pageSize)
+         {
+             this.select = select;
+             this.cl = cl;
+             this.clientState = clientState;
+             this.pager = pager;
+             this.pageSize = pageSize;
+             this.metadata = select.getResultMetadata().requestNames();
+         }
+ 
+         public int size()
+         {
+             throw new UnsupportedOperationException();
+         }
+ 
+         public Row one()
+         {
+             throw new UnsupportedOperationException();
+         }
+ 
+         public Iterator<Row> iterator()
+         {
+             return new AbstractIterator<Row>()
+             {
+                 private Iterator<List<ByteBuffer>> currentPage;
+ 
+                 protected Row computeNext()
+                 {
+                     int nowInSec = FBUtilities.nowInSeconds();
+                     while (currentPage == null || !currentPage.hasNext())
+                     {
+                         if (pager.isExhausted())
+                             return endOfData();
+ 
 -                        try (PartitionIterator iter = pager.fetchPage(pageSize, cl, clientState))
++                        try (PartitionIterator iter = pager.fetchPage(pageSize, cl, clientState, System.nanoTime()))
+                         {
+                             currentPage = select.process(iter, nowInSec).rows.iterator();
+                         }
+                     }
+                     return new Row(metadata, currentPage.next());
+                 }
+             };
+         }
+ 
+         public List<ColumnSpecification> metadata()
+         {
+             return metadata;
+         }
+     }
+ 
      private static class FromResultList extends UntypedResultSet
      {
          private final List<Map<String, ByteBuffer>> cqlRows;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 038648c,c27523f..ec5e58f
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -29,9 -33,11 +33,11 @@@ import org.apache.cassandra.db.Consiste
  import org.apache.cassandra.distributed.api.ICoordinator;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
+ import org.apache.cassandra.service.pager.QueryPager;
 -import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
  
  public class Coordinator implements ICoordinator
  {
@@@ -57,11 -64,10 +64,11 @@@
                                                   QueryOptions.create(consistencyLevel,
                                                                       boundBBValues,
                                                                       false,
-                                                                      10,
+                                                                      Integer.MAX_VALUE,
                                                                       null,
                                                                       null,
 -                                                                     Server.CURRENT_VERSION));
 +                                                                     ProtocolVersion.CURRENT),
 +                                                  System.nanoTime());
  
              if (res != null && res.kind == ResultMessage.Kind.ROWS)
              {
@@@ -73,4 -79,59 +80,59 @@@
              }
          }).call();
      }
+ 
+     @Override
+     public Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevelOrigin, int pageSize, Object... boundValues)
+     {
+         if (pageSize <= 0)
+             throw new IllegalArgumentException("Page size should be strictly positive but was " + pageSize);
+ 
+         return instance.sync(() -> {
+             ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(consistencyLevelOrigin.name());
+             ClientState clientState = makeFakeClientState();
+             CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
+             List<ByteBuffer> boundBBValues = new ArrayList<>();
+             for (Object boundValue : boundValues)
+             {
+                 boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
+             }
+ 
+             prepared.validate(QueryState.forInternalCalls().getClientState());
+             assert prepared instanceof SelectStatement : "Only SELECT statements can be executed with paging";
+ 
+             SelectStatement selectStatement = (SelectStatement) prepared;
+ 
+             QueryPager pager = selectStatement.getQuery(QueryOptions.create(consistencyLevel,
+                                                                             boundBBValues,
+                                                                             false,
+                                                                             pageSize,
+                                                                             null,
+                                                                             null,
 -                                                                            Server.CURRENT_VERSION),
++                                                                            ProtocolVersion.CURRENT),
+                                                         FBUtilities.nowInSeconds())
 -                                     .getPager(null, Server.CURRENT_VERSION);
++                                              .getPager(null, ProtocolVersion.CURRENT);
+ 
+             // Usually pager fetches a single page (see SelectStatement#execute). We need to iterate over all
+             // of the results lazily.
+             return new Iterator<Object[]>() {
+                 Iterator<Object[]> iter = RowUtil.toObjects(UntypedResultSet.create(selectStatement, consistencyLevel, clientState, pager,  pageSize));
+ 
+                 public boolean hasNext()
+                 {
+                     // We have to make sure iterator is not running on main thread.
+                     return instance.sync(() -> iter.hasNext()).call();
+                 }
+ 
+                 public Object[] next()
+                 {
+                     return instance.sync(() -> iter.next()).call();
+                 }
+             };
+         }).call();
+     }
+ 
+     private static final ClientState makeFakeClientState()
+     {
+         return ClientState.forExternalCalls(new InetSocketAddress(FBUtilities.getLocalAddress(), 9042));
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org