You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/12/13 10:37:51 UTC
[15/19] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8bbe2f59
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8bbe2f59
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8bbe2f59
Branch: refs/heads/cassandra-3.11
Commit: 8bbe2f5951be9ddfd74e455ba20a09b3dc309cfe
Parents: 88c5956 36ce4e0
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Dec 13 10:21:58 2016 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Tue Dec 13 10:29:08 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/jvm.options | 5 +
src/java/org/apache/cassandra/db/Keyspace.java | 23 ----
.../cassandra/index/SecondaryIndexManager.java | 109 ++++++++++++----
.../internal/CollatedViewIndexBuilder.java | 3 +-
.../apache/cassandra/index/CustomIndexTest.java | 130 ++++++++++++++++++-
6 files changed, 222 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7413086,a65a147..2156ab9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -165,11 -55,6 +165,12 @@@ Merged from 3.0
* Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
* Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
Merged from 2.2:
++ * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
+ * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
+ * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)
+ * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
+ * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
* Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
* Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/conf/jvm.options
----------------------------------------------------------------------
diff --cc conf/jvm.options
index 0e329d6,a7b3bd8..f91466a
--- a/conf/jvm.options
+++ b/conf/jvm.options
@@@ -8,138 -8,6 +8,143 @@@
# - dynamic flags will be appended to these on cassandra-env #
###########################################################################
+######################
+# STARTUP PARAMETERS #
+######################
+
+# Uncomment any of the following properties to enable specific startup parameters
+
+# In a multi-instance deployment, multiple Cassandra instances will independently assume that all
+# CPU processors are available to it. This setting allows you to specify a smaller set of processors
+# and perhaps have affinity.
+#-Dcassandra.available_processors=number_of_processors
+
+# The directory location of the cassandra.yaml file.
+#-Dcassandra.config=directory
+
+# Sets the initial partitioner token for a node the first time the node is started.
+#-Dcassandra.initial_token=token
+
+# Set to false to start Cassandra on a node but not have the node join the cluster.
+#-Dcassandra.join_ring=true|false
+
+# Set to false to clear all gossip state for the node on restart. Use when you have changed node
+# information in cassandra.yaml (such as listen_address).
+#-Dcassandra.load_ring_state=true|false
+
+# Enable pluggable metrics reporter. See Pluggable metrics reporting in Cassandra 2.0.2.
+#-Dcassandra.metricsReporterConfigFile=file
+
+# Set the port on which the CQL native transport listens for clients. (Default: 9042)
+#-Dcassandra.native_transport_port=port
+
+# Overrides the partitioner. (Default: org.apache.cassandra.dht.Murmur3Partitioner)
+#-Dcassandra.partitioner=partitioner
+
+# To replace a node that has died, restart a new node in its place specifying the address of the
+# dead node. The new node must not have any data in its data directory, that is, it must be in the
+# same state as before bootstrapping.
+#-Dcassandra.replace_address=listen_address or broadcast_address of dead node
+
+# Allow restoring specific tables from an archived commit log.
+#-Dcassandra.replayList=table
+
+# Allows overriding of the default RING_DELAY (1000ms), which is the amount of time a node waits
+# before joining the ring.
+#-Dcassandra.ring_delay_ms=ms
+
+# Set the port for the Thrift RPC service, which is used for client connections. (Default: 9160)
+#-Dcassandra.rpc_port=port
+
+# Set the SSL port for encrypted communication. (Default: 7001)
+#-Dcassandra.ssl_storage_port=port
+
+# Enable or disable the native transport server. See start_native_transport in cassandra.yaml.
+# cassandra.start_native_transport=true|false
+
+# Enable or disable the Thrift RPC server. (Default: true)
+#-Dcassandra.start_rpc=true/false
+
+# Set the port for inter-node communication. (Default: 7000)
+#-Dcassandra.storage_port=port
+
+# Set the default location for the trigger JARs. (Default: conf/triggers)
+#-Dcassandra.triggers_dir=directory
+
+# For testing new compaction and compression strategies. It allows you to experiment with different
+# strategies and benchmark write performance differences without affecting the production workload.
+#-Dcassandra.write_survey=true
+
+# To disable configuration via JMX of auth caches (such as those for credentials, permissions and
+# roles). This will mean those config options can only be set (persistently) in cassandra.yaml
+# and will require a restart for new values to take effect.
+#-Dcassandra.disable_auth_caches_remote_configuration=true
+
++# To disable dynamic calculation of the page size used when indexing an entire partition (during
++# initial index build/rebuild). If set to true, the page size will be fixed to the default of
++# 10000 rows per page.
++#-Dcassandra.force_default_indexing_page_size=true
++
+########################
+# GENERAL JVM SETTINGS #
+########################
+
+# enable assertions. highly suggested for correct application functionality.
+-ea
+
+# enable thread priorities, primarily so we can give periodic tasks
+# a lower priority to avoid interfering with client workload
+-XX:+UseThreadPriorities
+
+# allows lowering thread priority without being root on linux - probably
+# not necessary on Windows but doesn't harm anything.
+# see http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workar
+-XX:ThreadPriorityPolicy=42
+
+# Enable heap-dump if there's an OOM
+-XX:+HeapDumpOnOutOfMemoryError
+
+# Per-thread stack size.
+-Xss256k
+
+# Larger interned string table, for gossip's benefit (CASSANDRA-6410)
+-XX:StringTableSize=1000003
+
+# Make sure all memory is faulted and zeroed on startup.
+# This helps prevent soft faults in containers and makes
+# transparent hugepage allocation more effective.
+-XX:+AlwaysPreTouch
+
+# Disable biased locking as it does not benefit Cassandra.
+-XX:-UseBiasedLocking
+
+# Enable thread-local allocation blocks and allow the JVM to automatically
+# resize them at runtime.
+-XX:+UseTLAB
+-XX:+ResizeTLAB
+-XX:+UseNUMA
+
+# http://www.evanjones.ca/jvm-mmap-pause.html
+-XX:+PerfDisableSharedMem
+
+# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See
+# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
+# comment out this entry to enable IPv6 support).
+-Djava.net.preferIPv4Stack=true
+
+### Debug options
+
+# uncomment to enable flight recorder
+#-XX:+UnlockCommercialFeatures
+#-XX:+FlightRecorder
+
+# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414
+#-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=1414
+
+# uncomment to have Cassandra JVM log internal method compilation (developers only)
+#-XX:+UnlockDiagnosticVMOptions
+#-XX:+LogCompilation
+
#################
# HEAP SETTINGS #
#################
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 6e36511,003b624..ccb91c1
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -55,7 -56,10 +56,10 @@@ import org.apache.cassandra.index.trans
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
+ import org.apache.cassandra.service.pager.SinglePartitionPager;
import org.apache.cassandra.tracing.Tracing;
++import org.apache.cassandra.transport.ProtocolVersion;
+ import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
@@@ -536,35 -523,55 +543,55 @@@ public class SecondaryIndexManager impl
/**
* When building an index against existing data in sstables, add the given partition to the index
*/
- public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
+ public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
{
+ if (logger.isTraceEnabled())
+ logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
+
if (!indexes.isEmpty())
{
- DecoratedKey key = partition.partitionKey();
- Set<Index.Indexer> indexers = indexes.stream()
- .map(index -> index.indexerFor(key,
- partition.columns(),
- nowInSec,
- opGroup,
- IndexTransaction.Type.UPDATE))
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
-
- indexers.forEach(Index.Indexer::begin);
-
- try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
+ FBUtilities.nowInSeconds(),
+ key);
+ int nowInSec = cmd.nowInSec();
+ boolean readStatic = false;
+
- SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
++ SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT);
+ while (!pager.isExhausted())
{
- if (!filtered.staticRow().isEmpty())
- indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
-
- while (filtered.hasNext())
- try (ReadOrderGroup readGroup = cmd.startOrderGroup();
++ try (ReadExecutionController controller = cmd.executionController();
+ OpOrder.Group writeGroup = Keyspace.writeOrder.start();
+ RowIterator partition =
- PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup),
++ PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize, controller),
+ cmd))
{
- Row row = filtered.next();
- indexers.forEach(indexer -> indexer.insertRow(row));
+ Set<Index.Indexer> indexers = indexes.stream()
+ .map(index -> index.indexerFor(key,
+ partition.columns(),
+ nowInSec,
+ writeGroup,
+ IndexTransaction.Type.UPDATE))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ indexers.forEach(Index.Indexer::begin);
+
+ // only process the static row once per partition
+ if (!readStatic && !partition.staticRow().isEmpty())
+ {
+ indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+ readStatic = true;
+ }
+
+ while (partition.hasNext())
+ {
+ Row row = partition.next();
+ indexers.forEach(indexer -> indexer.insertRow(row));
+ }
+
+ indexers.forEach(Index.Indexer::finish);
}
}
-
- indexers.forEach(Index.Indexer::finish);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
index 8ea7a68,0000000..811d857
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
@@@ -1,78 -1,0 +1,79 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Manages building an entire index from column family data. Runs on to compaction manager.
+ */
+public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
+{
+ private final ColumnFamilyStore cfs;
+ private final Set<Index> indexers;
+ private final ReducingKeyIterator iter;
+ private final UUID compactionId;
+
+ public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
+ {
+ this.cfs = cfs;
+ this.indexers = indexers;
+ this.iter = iter;
+ this.compactionId = UUIDGen.getTimeUUID();
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(cfs.metadata,
+ OperationType.INDEX_BUILD,
+ iter.getBytesRead(),
+ iter.getTotalBytes(),
+ compactionId);
+ }
+
+ public void build()
+ {
+ try
+ {
++ int pageSize = cfs.indexManager.calculateIndexingPageSize();
+ while (iter.hasNext())
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+ DecoratedKey key = iter.next();
- Keyspace.indexPartition(key, cfs, indexers);
++ cfs.indexManager.indexPartition(key, indexers, pageSize);
+ }
+ }
+ finally
+ {
+ iter.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 4a43210,33e7182..a462e2f
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -46,11 -44,15 +44,15 @@@ import org.apache.cassandra.db.marshal.
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
+ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.Util.throwAssert;
import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;