You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/12/13 10:37:51 UTC

[15/19] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8bbe2f59
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8bbe2f59
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8bbe2f59

Branch: refs/heads/cassandra-3.11
Commit: 8bbe2f5951be9ddfd74e455ba20a09b3dc309cfe
Parents: 88c5956 36ce4e0
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Dec 13 10:21:58 2016 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Tue Dec 13 10:29:08 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/jvm.options                                |   5 +
 src/java/org/apache/cassandra/db/Keyspace.java  |  23 ----
 .../cassandra/index/SecondaryIndexManager.java  | 109 ++++++++++++----
 .../internal/CollatedViewIndexBuilder.java      |   3 +-
 .../apache/cassandra/index/CustomIndexTest.java | 130 ++++++++++++++++++-
 6 files changed, 222 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7413086,a65a147..2156ab9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -165,11 -55,6 +165,12 @@@ Merged from 3.0
   * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
   * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
  Merged from 2.2:
++ * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
 + * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
 + * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)
 + * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
 + * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
 + * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/conf/jvm.options
----------------------------------------------------------------------
diff --cc conf/jvm.options
index 0e329d6,a7b3bd8..f91466a
--- a/conf/jvm.options
+++ b/conf/jvm.options
@@@ -8,138 -8,6 +8,143 @@@
  # - dynamic flags will be appended to these on cassandra-env              #
  ###########################################################################
  
 +######################
 +# STARTUP PARAMETERS #
 +######################
 +
 +# Uncomment any of the following properties to enable specific startup parameters
 +
 +# In a multi-instance deployment, multiple Cassandra instances will independently assume that all
 +# CPU processors are available to it. This setting allows you to specify a smaller set of processors
 +# and perhaps have affinity.
 +#-Dcassandra.available_processors=number_of_processors
 +
 +# The directory location of the cassandra.yaml file.
 +#-Dcassandra.config=directory
 +
 +# Sets the initial partitioner token for a node the first time the node is started.
 +#-Dcassandra.initial_token=token
 +
 +# Set to false to start Cassandra on a node but not have the node join the cluster.
 +#-Dcassandra.join_ring=true|false
 +
 +# Set to false to clear all gossip state for the node on restart. Use when you have changed node
 +# information in cassandra.yaml (such as listen_address).
 +#-Dcassandra.load_ring_state=true|false
 +
 +# Enable pluggable metrics reporter. See Pluggable metrics reporting in Cassandra 2.0.2.
 +#-Dcassandra.metricsReporterConfigFile=file
 +
 +# Set the port on which the CQL native transport listens for clients. (Default: 9042)
 +#-Dcassandra.native_transport_port=port
 +
 +# Overrides the partitioner. (Default: org.apache.cassandra.dht.Murmur3Partitioner)
 +#-Dcassandra.partitioner=partitioner
 +
 +# To replace a node that has died, restart a new node in its place specifying the address of the
 +# dead node. The new node must not have any data in its data directory, that is, it must be in the
 +# same state as before bootstrapping.
 +#-Dcassandra.replace_address=listen_address or broadcast_address of dead node
 +
 +# Allow restoring specific tables from an archived commit log.
 +#-Dcassandra.replayList=table
 +
 +# Allows overriding of the default RING_DELAY (1000ms), which is the amount of time a node waits
 +# before joining the ring.
 +#-Dcassandra.ring_delay_ms=ms
 +
 +# Set the port for the Thrift RPC service, which is used for client connections. (Default: 9160)
 +#-Dcassandra.rpc_port=port
 +
 +# Set the SSL port for encrypted communication. (Default: 7001)
 +#-Dcassandra.ssl_storage_port=port
 +
 +# Enable or disable the native transport server. See start_native_transport in cassandra.yaml.
 +# cassandra.start_native_transport=true|false
 +
 +# Enable or disable the Thrift RPC server. (Default: true)
 +#-Dcassandra.start_rpc=true/false
 +
 +# Set the port for inter-node communication. (Default: 7000)
 +#-Dcassandra.storage_port=port
 +
 +# Set the default location for the trigger JARs. (Default: conf/triggers)
 +#-Dcassandra.triggers_dir=directory
 +
 +# For testing new compaction and compression strategies. It allows you to experiment with different
 +# strategies and benchmark write performance differences without affecting the production workload. 
 +#-Dcassandra.write_survey=true
 +
 +# To disable configuration via JMX of auth caches (such as those for credentials, permissions and
 +# roles). This will mean those config options can only be set (persistently) in cassandra.yaml
 +# and will require a restart for new values to take effect.
 +#-Dcassandra.disable_auth_caches_remote_configuration=true
 +
++# To disable dynamic calculation of the page size used when indexing an entire partition (during
++# initial index build/rebuild). If set to true, the page size will be fixed to the default of
++# 10000 rows per page.
++#-Dcassandra.force_default_indexing_page_size=true
++
 +########################
 +# GENERAL JVM SETTINGS #
 +########################
 +
 +# enable assertions. highly suggested for correct application functionality.
 +-ea
 +
 +# enable thread priorities, primarily so we can give periodic tasks
 +# a lower priority to avoid interfering with client workload
 +-XX:+UseThreadPriorities
 +
 +# allows lowering thread priority without being root on linux - probably
 +# not necessary on Windows but doesn't harm anything.
 +# see http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workar
 +-XX:ThreadPriorityPolicy=42
 +
 +# Enable heap-dump if there's an OOM
 +-XX:+HeapDumpOnOutOfMemoryError
 +
 +# Per-thread stack size.
 +-Xss256k
 +
 +# Larger interned string table, for gossip's benefit (CASSANDRA-6410)
 +-XX:StringTableSize=1000003
 +
 +# Make sure all memory is faulted and zeroed on startup.
 +# This helps prevent soft faults in containers and makes
 +# transparent hugepage allocation more effective.
 +-XX:+AlwaysPreTouch
 +
 +# Disable biased locking as it does not benefit Cassandra.
 +-XX:-UseBiasedLocking
 +
 +# Enable thread-local allocation blocks and allow the JVM to automatically
 +# resize them at runtime.
 +-XX:+UseTLAB
 +-XX:+ResizeTLAB
 +-XX:+UseNUMA
 +
 +# http://www.evanjones.ca/jvm-mmap-pause.html
 +-XX:+PerfDisableSharedMem
 +
 +# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See
 +# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
 +# comment out this entry to enable IPv6 support).
 +-Djava.net.preferIPv4Stack=true
 +
 +### Debug options
 +
 +# uncomment to enable flight recorder
 +#-XX:+UnlockCommercialFeatures
 +#-XX:+FlightRecorder
 +
 +# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414
 +#-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=1414
 +
 +# uncomment to have Cassandra JVM log internal method compilation (developers only)
 +#-XX:+UnlockDiagnosticVMOptions
 +#-XX:+LogCompilation
 +
  #################
  # HEAP SETTINGS #
  #################

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 6e36511,003b624..ccb91c1
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -55,7 -56,10 +56,10 @@@ import org.apache.cassandra.index.trans
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.schema.IndexMetadata;
  import org.apache.cassandra.schema.Indexes;
+ import org.apache.cassandra.service.pager.SinglePartitionPager;
  import org.apache.cassandra.tracing.Tracing;
++import org.apache.cassandra.transport.ProtocolVersion;
+ import org.apache.cassandra.transport.Server;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.concurrent.OpOrder;
  import org.apache.cassandra.utils.concurrent.Refs;
@@@ -536,35 -523,55 +543,55 @@@ public class SecondaryIndexManager impl
      /**
       * When building an index against existing data in sstables, add the given partition to the index
       */
-     public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
+     public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
      {
+         if (logger.isTraceEnabled())
+             logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
+ 
          if (!indexes.isEmpty())
          {
-             DecoratedKey key = partition.partitionKey();
-             Set<Index.Indexer> indexers = indexes.stream()
-                                                  .map(index -> index.indexerFor(key,
-                                                                                 partition.columns(),
-                                                                                 nowInSec,
-                                                                                 opGroup,
-                                                                                 IndexTransaction.Type.UPDATE))
-                                                  .filter(Objects::nonNull)
-                                                  .collect(Collectors.toSet());
- 
-             indexers.forEach(Index.Indexer::begin);
- 
-             try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+             SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
+                                                                                           FBUtilities.nowInSeconds(),
+                                                                                           key);
+             int nowInSec = cmd.nowInSec();
+             boolean readStatic = false;
+ 
 -            SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
++            SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT);
+             while (!pager.isExhausted())
              {
-                 if (!filtered.staticRow().isEmpty())
-                     indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
- 
-                 while (filtered.hasNext())
 -                try (ReadOrderGroup readGroup = cmd.startOrderGroup();
++                try (ReadExecutionController controller = cmd.executionController();
+                      OpOrder.Group writeGroup = Keyspace.writeOrder.start();
+                      RowIterator partition =
 -                        PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup),
++                        PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize, controller),
+                                                           cmd))
                  {
-                     Row row = filtered.next();
-                     indexers.forEach(indexer -> indexer.insertRow(row));
+                     Set<Index.Indexer> indexers = indexes.stream()
+                                                          .map(index -> index.indexerFor(key,
+                                                                                         partition.columns(),
+                                                                                         nowInSec,
+                                                                                         writeGroup,
+                                                                                         IndexTransaction.Type.UPDATE))
+                                                          .filter(Objects::nonNull)
+                                                          .collect(Collectors.toSet());
+ 
+                     indexers.forEach(Index.Indexer::begin);
+ 
+                     // only process the static row once per partition
+                     if (!readStatic && !partition.staticRow().isEmpty())
+                     {
+                         indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+                         readStatic = true;
+                     }
+ 
+                     while (partition.hasNext())
+                     {
+                         Row row = partition.next();
+                         indexers.forEach(indexer -> indexer.insertRow(row));
+                     }
+ 
+                     indexers.forEach(Index.Indexer::finish);
                  }
              }
- 
-             indexers.forEach(Index.Indexer::finish);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
index 8ea7a68,0000000..811d857
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
@@@ -1,78 -1,0 +1,79 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index.internal;
 +
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.compaction.CompactionInfo;
 +import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +/**
 + * Manages building an entire index from column family data. Runs on to compaction manager.
 + */
 +public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
 +{
 +    private final ColumnFamilyStore cfs;
 +    private final Set<Index> indexers;
 +    private final ReducingKeyIterator iter;
 +    private final UUID compactionId;
 +
 +    public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
 +    {
 +        this.cfs = cfs;
 +        this.indexers = indexers;
 +        this.iter = iter;
 +        this.compactionId = UUIDGen.getTimeUUID();
 +    }
 +
 +    public CompactionInfo getCompactionInfo()
 +    {
 +        return new CompactionInfo(cfs.metadata,
 +                OperationType.INDEX_BUILD,
 +                iter.getBytesRead(),
 +                iter.getTotalBytes(),
 +                compactionId);
 +    }
 +
 +    public void build()
 +    {
 +        try
 +        {
++            int pageSize = cfs.indexManager.calculateIndexingPageSize();
 +            while (iter.hasNext())
 +            {
 +                if (isStopRequested())
 +                    throw new CompactionInterruptedException(getCompactionInfo());
 +                DecoratedKey key = iter.next();
-                 Keyspace.indexPartition(key, cfs, indexers);
++                cfs.indexManager.indexPartition(key, indexers, pageSize);
 +            }
 +        }
 +        finally
 +        {
 +            iter.close();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 4a43210,33e7182..a462e2f
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -46,11 -44,15 +44,15 @@@ import org.apache.cassandra.db.marshal.
  import org.apache.cassandra.db.marshal.Int32Type;
  import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+ import org.apache.cassandra.db.rows.Row;
  import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.index.transactions.IndexTransaction;
  import org.apache.cassandra.schema.IndexMetadata;
  import org.apache.cassandra.schema.Indexes;
 -import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.transport.ProtocolVersion;
+ import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.OpOrder;
  
  import static org.apache.cassandra.Util.throwAssert;
  import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;