You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/06/28 13:21:57 UTC

[07/17] cassandra git commit: Merge commit '017ec3e99e704db5e1a36ad153af08d6e7eca523' into cassandra-2.2

Merge commit '017ec3e99e704db5e1a36ad153af08d6e7eca523' into cassandra-2.2

* commit '017ec3e99e704db5e1a36ad153af08d6e7eca523':
  Avoid stalling Paxos when the paxos state expires


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6555a87b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6555a87b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6555a87b

Branch: refs/heads/cassandra-3.0
Commit: 6555a87bde4daeb8bd5d9558595a367ec6bc061d
Parents: 3b448b3 017ec3e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jun 28 15:17:40 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 28 15:18:27 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cql3/QueryProcessor.java   | 28 +++++++++++++++++++-
 .../cql3/statements/SelectStatement.java        |  6 ++++-
 .../org/apache/cassandra/db/SystemKeyspace.java |  6 ++---
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../cassandra/service/paxos/PaxosState.java     | 11 ++++++--
 .../service/paxos/PrepareCallback.java          | 18 ++++++++++++-
 7 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8d2062d,feeaded..9f42d98
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
 -2.1.15
 +2.2.7
 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
 + * Validate bloom_filter_fp_chance against lowest supported
 +   value when the table is created (CASSANDRA-11920)
 + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
 + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
 + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
   * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
   * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
   * Prevent select statements with clustering key > 64k (CASSANDRA-11882)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index fa82fa7,4340d42..c702679
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -331,15 -354,52 +331,41 @@@ public class QueryProcessor implements 
       */
      public static UntypedResultSet executeOnceInternal(String query, Object... values)
      {
 -        try
 -        {
 -            ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState());
 -            prepared.statement.validate(internalQueryState().getClientState());
 -            ResultMessage result = prepared.statement.executeInternal(internalQueryState(), makeInternalOptions(prepared, values));
 -            if (result instanceof ResultMessage.Rows)
 -                return UntypedResultSet.create(((ResultMessage.Rows)result).result);
 -            else
 -                return null;
 -        }
 -        catch (RequestExecutionException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -        catch (RequestValidationException e)
 -        {
 -            throw new RuntimeException("Error validating query " + query, e);
 -        }
 +        ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState());
 +        prepared.statement.validate(internalQueryState().getClientState());
 +        ResultMessage result = prepared.statement.executeInternal(internalQueryState(), makeInternalOptions(prepared, values));
 +        if (result instanceof ResultMessage.Rows)
 +            return UntypedResultSet.create(((ResultMessage.Rows)result).result);
 +        else
 +            return null;
      }
  
+     /**
+      * A special version of executeInternal that takes the time used as "now" for the query in argument.
+      * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
+      * cases.
+      */
+     public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values)
+     {
+         try
+         {
+             ParsedStatement.Prepared prepared = prepareInternal(query);
+             assert prepared.statement instanceof SelectStatement;
+             SelectStatement select = (SelectStatement)prepared.statement;
+             ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now);
+             assert result instanceof ResultMessage.Rows;
+             return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+         }
+         catch (RequestExecutionException e)
+         {
+             throw new RuntimeException(e);
+         }
+         catch (RequestValidationException e)
+         {
+             throw new RuntimeException("Error validating query " + query, e);
+         }
+     }
+ 
      public static UntypedResultSet resultify(String query, Row row)
      {
          return resultify(query, Collections.singletonList(row));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 20fe982,6351bb5..8820ff7
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -291,10 -312,17 +291,14 @@@ public class SelectStatement implement
  
      public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
      {
+         return executeInternal(state, options, System.currentTimeMillis());
+     }
+ 
+     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException
+     {
          int limit = getLimit(options);
-         long now = System.currentTimeMillis();
          Pageable command = getPageableCommand(options, limit, now);
 -
 -        int pageSize = options.getPageSize();
 -        if (parameters.isCount && pageSize <= 0)
 -            pageSize = DEFAULT_COUNT_PAGE_SIZE;
 +        int pageSize = getPageSize(options);
  
          if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 74a3c7b,f8cf1ab..e0d5f66
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -880,24 -796,110 +880,24 @@@ public final class SystemKeyspac
      }
  
      /**
 -     * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns)
 -     * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily)
 +     * Gets the stored data center for the local node, or null if none have been set yet.
       */
 -    public static List<Row> serializedSchema(String schemaCfName)
 -    {
 -        Token minToken = StorageService.getPartitioner().getMinimumToken();
 -
 -        return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
 -                                                     null,
 -                                                     new IdentityQueryFilter(),
 -                                                     Integer.MAX_VALUE,
 -                                                     System.currentTimeMillis());
 -    }
 -
 -    public static Collection<Mutation> serializeSchema()
 +    public static String getDatacenter()
      {
 -        Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
 +        String req = "SELECT data_center FROM system.%s WHERE key='%s'";
 +        UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL));
  
 -        for (String cf : allSchemaCfs)
 -            serializeSchema(mutationMap, cf);
 +        // Look up the Data center (return it if found)
 +        if (!result.isEmpty() && result.one().has("data_center"))
 +            return result.one().getString("data_center");
  
 -        return mutationMap.values();
 -    }
 -
 -    private static void serializeSchema(Map<DecoratedKey, Mutation> mutationMap, String schemaCfName)
 -    {
 -        for (Row schemaRow : serializedSchema(schemaCfName))
 -        {
 -            if (Schema.ignoredSchemaRow(schemaRow))
 -                continue;
 -
 -            Mutation mutation = mutationMap.get(schemaRow.key);
 -            if (mutation == null)
 -            {
 -                mutation = new Mutation(Keyspace.SYSTEM_KS, schemaRow.key.getKey());
 -                mutationMap.put(schemaRow.key, mutation);
 -            }
 -
 -            mutation.add(schemaRow.cf);
 -        }
 -    }
 -
 -    public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces)
 -    {
 -        Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
 -
 -        for (String keyspace : keyspaces)
 -        {
 -            Row schemaEntity = readSchemaRow(schemaCfName, keyspace);
 -            if (schemaEntity.cf != null)
 -                schema.put(schemaEntity.key, schemaEntity.cf);
 -        }
 -
 -        return schema;
 -    }
 -
 -    public static ByteBuffer getSchemaKSKey(String ksName)
 -    {
 -        return AsciiType.instance.fromString(ksName);
 -    }
 -
 -    /**
 -     * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace.
 -     *
 -     * @param schemaCfName the schema table to get the data from (schema_keyspaces, schema_columnfamilies, schema_columns or schema_triggers)
 -     * @param ksName the keyspace of the tables we are interested in
 -     * @return a Row containing the schema data of a particular type for the keyspace
 -     */
 -    public static Row readSchemaRow(String schemaCfName, String ksName)
 -    {
 -        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 -
 -        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
 -        ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, schemaCfName, System.currentTimeMillis()));
 -
 -        return new Row(key, result);
 -    }
 -
 -    /**
 -     * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace+table pair.
 -     *
 -     * @param schemaCfName the schema table to get the data from (schema_columnfamilies, schema_columns or schema_triggers)
 -     * @param ksName the keyspace of the table we are interested in
 -     * @param cfName the table we are interested in
 -     * @return a Row containing the schema data of a particular type for the table
 -     */
 -    public static Row readSchemaRow(String schemaCfName, String ksName, String cfName)
 -    {
 -        DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
 -        ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName);
 -        Composite prefix = schemaCFS.getComparator().make(cfName);
 -        ColumnFamily cf = schemaCFS.getColumnFamily(key,
 -                                                    prefix,
 -                                                    prefix.end(),
 -                                                    false,
 -                                                    Integer.MAX_VALUE,
 -                                                    System.currentTimeMillis());
 -        return new Row(key, cf);
 +        return null;
      }
  
-     public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
+     public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now)
      {
          String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-         UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId);
 -        UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS_CF), key, metadata.cfId);
++        UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS), key, metadata.cfId);
          if (results.isEmpty())
              return new PaxosState(key, metadata);
          UntypedResultSet.Row row = results.one();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6555a87b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------