You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/12/14 00:45:51 UTC
[12/19] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 003b624,0000000..a6ed3ba
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -1,1112 -1,0 +1,1114 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index;
+
+import java.lang.reflect.Constructor;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ColumnDefinition;
++import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.PartitionIterators;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.transactions.*;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.service.pager.SinglePartitionPager;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
+ * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
+ * and so on.
+ *
+ * The Index interface defines a number of methods which return Callable<?>. These are primarily the
+ * management tasks for an index implementation. Most of them are currently executed in a blocking
+ * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
+ * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially
+ * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could
+ * then be defined with as void and called directly from SIM (rather than being run via the executor service).
+ * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
+ * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
+ *
+ * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
+ * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
+ * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
+ * indexes is performed on the CompactionManager.
+ *
+ * This class also provides instances of processors which listen to updates to the base table and forward to
+ * registered Indexes the info required to keep those indexes up to date.
+ * There are two variants of these processors, each with a factory method provided by SIM:
+ * IndexTransaction: deals with updates generated on the regular write path.
+ * CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
+ * Further details on their usage and lifecycles can be found in the interface definitions below.
+ *
+ * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
+ * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
+ * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
+ * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
+ * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
+ * a target replica.
+ */
+public class SecondaryIndexManager implements IndexRegistry
+{
+ private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
+
+ // default page size (in rows) when rebuilding the index for a whole partition
+ public static final int DEFAULT_PAGE_SIZE = 10000;
+
+ private Map<String, Index> indexes = Maps.newConcurrentMap();
+
+ /**
+ * The indexes that are ready to server requests.
+ */
+ private Set<String> builtIndexes = Sets.newConcurrentHashSet();
+
+ // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
+ private static final ExecutorService asyncExecutor =
+ new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("SecondaryIndexManagement"),
+ "internal");
+
+ // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
+ private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
+
+ /**
+ * The underlying column family containing the source data for these indexes
+ */
+ public final ColumnFamilyStore baseCfs;
+
+ public SecondaryIndexManager(ColumnFamilyStore baseCfs)
+ {
+ this.baseCfs = baseCfs;
+ }
+
+
+ /**
+ * Drops and adds new indexes associated with the underlying CF
+ */
+ public void reload()
+ {
+ // figure out what needs to be added and dropped.
+ Indexes tableIndexes = baseCfs.metadata.getIndexes();
+ indexes.keySet()
+ .stream()
+ .filter(indexName -> !tableIndexes.has(indexName))
+ .forEach(this::removeIndex);
+
+ // we call add for every index definition in the collection as
+ // some may not have been created here yet, only added to schema
+ for (IndexMetadata tableIndex : tableIndexes)
+ addIndex(tableIndex);
+ }
+
+ private Future<?> reloadIndex(IndexMetadata indexDef)
+ {
+ Index index = indexes.get(indexDef.name);
+ Callable<?> reloadTask = index.getMetadataReloadTask(indexDef);
+ return reloadTask == null
+ ? Futures.immediateFuture(null)
+ : blockingExecutor.submit(reloadTask);
+ }
+
+ private Future<?> createIndex(IndexMetadata indexDef)
+ {
+ Index index = createInstance(indexDef);
+ index.register(this);
+
+ // if the index didn't register itself, we can probably assume that no initialization needs to happen
+ final Callable<?> initialBuildTask = indexes.containsKey(indexDef.name)
+ ? index.getInitializationTask()
+ : null;
+ if (initialBuildTask == null)
+ {
+ // We need to make sure that the index is marked as built in the case where the initialBuildTask
+ // does not need to be run (if the index didn't register itself or if the base table was empty).
+ markIndexBuilt(indexDef.name);
+ return Futures.immediateFuture(null);
+ }
+ return asyncExecutor.submit(index.getInitializationTask());
+ }
+
+ /**
+ * Adds and builds a index
+ * @param indexDef the IndexMetadata describing the index
+ */
+ public synchronized Future<?> addIndex(IndexMetadata indexDef)
+ {
+ if (indexes.containsKey(indexDef.name))
+ return reloadIndex(indexDef);
+ else
+ return createIndex(indexDef);
+ }
+
+ /**
+ * Checks if the specified index is queryable.
+ *
+ * @param index the index
+ * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
+ */
+ public boolean isIndexQueryable(Index index)
+ {
+ return builtIndexes.contains(index.getIndexMetadata().name);
+ }
+
+ public synchronized void removeIndex(String indexName)
+ {
+ Index index = unregisterIndex(indexName);
+ if (null != index)
+ {
+ markIndexRemoved(indexName);
+ executeBlocking(index.getInvalidateTask());
+ }
+ }
+
+
+ public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
+ {
+ if (indexes.isEmpty())
+ return Collections.emptySet();
+
+ Set<IndexMetadata> dependentIndexes = new HashSet<>();
+ for (Index index : indexes.values())
+ if (index.dependsOn(column))
+ dependentIndexes.add(index.getIndexMetadata());
+
+ return dependentIndexes;
+ }
+
+ /**
+ * Called when dropping a Table
+ */
+ public void markAllIndexesRemoved()
+ {
+ getBuiltIndexNames().forEach(this::markIndexRemoved);
+ }
+
+ /**
+ * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
+ * Caller must acquire and release references to the sstables used here.
+ * Note also that only this method of (re)building indexes:
+ * a) takes a set of index *names* rather than Indexers
+ * b) marks exsiting indexes removed prior to rebuilding
+ *
+ * @param sstables the data to build from
+ * @param indexNames the list of indexes to be rebuilt
+ */
+ public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
+ {
+ Set<Index> toRebuild = indexes.values().stream()
+ .filter(index -> indexNames.contains(index.getIndexMetadata().name))
+ .filter(Index::shouldBuildBlocking)
+ .collect(Collectors.toSet());
+ if (toRebuild.isEmpty())
+ {
+ logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
+ return;
+ }
+
+ toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexMetadata().name));
+
+ buildIndexesBlocking(sstables, toRebuild);
+
+ toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexMetadata().name));
+ }
+
+ public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
+ {
+ buildIndexesBlocking(sstables, indexes.values()
+ .stream()
+ .filter(Index::shouldBuildBlocking)
+ .collect(Collectors.toSet()));
+ }
+
+ // For convenience, may be called directly from Index impls
+ public void buildIndexBlocking(Index index)
+ {
+ if (index.shouldBuildBlocking())
+ {
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
+ Refs<SSTableReader> sstables = viewFragment.refs)
+ {
+ buildIndexesBlocking(sstables, Collections.singleton(index));
+ markIndexBuilt(index.getIndexMetadata().name);
+ }
+ }
+ }
+
+ /**
+ * Checks if the specified {@link ColumnFamilyStore} is a secondary index.
+ *
+ * @param cfs the <code>ColumnFamilyStore</code> to check.
+ * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
+ * <code>false</code> otherwise.
+ */
+ public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs)
+ {
+ return isIndexColumnFamily(cfs.name);
+ }
+
+ /**
+ * Checks if the specified {@link ColumnFamilyStore} is the one secondary index.
+ *
+ * @param cfName the name of the <code>ColumnFamilyStore</code> to check.
+ * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
+ * <code>false</code> otherwise.
+ */
+ public static boolean isIndexColumnFamily(String cfName)
+ {
+ return cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR);
+ }
+
+ /**
+ * Returns the parent of the specified {@link ColumnFamilyStore}.
+ *
+ * @param cfs the <code>ColumnFamilyStore</code>
+ * @return the parent of the specified <code>ColumnFamilyStore</code>
+ */
+ public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs)
+ {
+ String parentCfs = getParentCfsName(cfs.name);
+ return cfs.keyspace.getColumnFamilyStore(parentCfs);
+ }
+
+ /**
+ * Returns the parent name of the specified {@link ColumnFamilyStore}.
+ *
+ * @param cfName the <code>ColumnFamilyStore</code> name
+ * @return the parent name of the specified <code>ColumnFamilyStore</code>
+ */
+ public static String getParentCfsName(String cfName)
+ {
+ assert isIndexColumnFamily(cfName);
+ return StringUtils.substringBefore(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
+ }
+
+ /**
+ * Returns the index name
+ *
+ * @param cfs the <code>ColumnFamilyStore</code>
+ * @return the index name
+ */
+ public static String getIndexName(ColumnFamilyStore cfs)
+ {
+ return getIndexName(cfs.name);
+ }
+
+ /**
+ * Returns the index name
+ *
+ * @param cfName the <code>ColumnFamilyStore</code> name
+ * @return the index name
+ */
+ public static String getIndexName(String cfName)
+ {
+ assert isIndexColumnFamily(cfName);
+ return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
+ }
+
+ private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
+ {
+ if (indexes.isEmpty())
+ return;
+
+ logger.info("Submitting index build of {} for data in {}",
+ indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
+ sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+
+ SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+ indexes,
+ new ReducingKeyIterator(sstables));
+ Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+ FBUtilities.waitOnFuture(future);
+
+ flushIndexesBlocking(indexes);
+ logger.info("Index build of {} complete",
+ indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")));
+ }
+
+ /**
+ * Marks the specified index as build.
+ * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
+ * @param indexName the index name
+ */
+ public void markIndexBuilt(String indexName)
+ {
+ builtIndexes.add(indexName);
- SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
++ if (DatabaseDescriptor.isDaemonInitialized())
++ SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
+ }
+
+ /**
+ * Marks the specified index as removed.
+ * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
+ * @param indexName the index name
+ */
+ public void markIndexRemoved(String indexName)
+ {
+ SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
+ }
+
+ public Index getIndexByName(String indexName)
+ {
+ return indexes.get(indexName);
+ }
+
+ private Index createInstance(IndexMetadata indexDef)
+ {
+ Index newIndex;
+ if (indexDef.isCustom())
+ {
+ assert indexDef.options != null;
+ String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
+ assert ! Strings.isNullOrEmpty(className);
+ try
+ {
+ Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
+ Constructor<? extends Index> ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
+ newIndex = (Index)ctor.newInstance(baseCfs, indexDef);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ newIndex = CassandraIndex.newIndex(baseCfs, indexDef);
+ }
+ return newIndex;
+ }
+
+ /**
+ * Truncate all indexes
+ */
+ public void truncateAllIndexesBlocking(final long truncatedAt)
+ {
+ executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
+ }
+
+ /**
+ * Remove all indexes
+ */
+ public void invalidateAllIndexesBlocking()
+ {
+ markAllIndexesRemoved();
+ executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
+ }
+
+ /**
+ * Perform a blocking flush all indexes
+ */
+ public void flushAllIndexesBlocking()
+ {
+ flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
+ }
+
+ /**
+ * Perform a blocking flush of selected indexes
+ */
+ public void flushIndexesBlocking(Set<Index> indexes)
+ {
+ if (indexes.isEmpty())
+ return;
+
+ List<Future<?>> wait = new ArrayList<>();
+ List<Index> nonCfsIndexes = new ArrayList<>();
+
+ // for each CFS backed index, submit a flush task which we'll wait on for completion
+ // for the non-CFS backed indexes, we'll flush those while we wait.
+ synchronized (baseCfs.getTracker())
+ {
+ indexes.forEach(index ->
+ index.getBackingTable()
+ .map(cfs -> wait.add(cfs.forceFlush()))
+ .orElseGet(() -> nonCfsIndexes.add(index)));
+ }
+
+ executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
+ FBUtilities.waitOnFutures(wait);
+ }
+
+ /**
+ * Performs a blocking flush of all custom indexes
+ */
+ public void flushAllNonCFSBackedIndexesBlocking()
+ {
+ executeAllBlocking(indexes.values()
+ .stream()
+ .filter(index -> !index.getBackingTable().isPresent()),
+ Index::getBlockingFlushTask);
+ }
+
+ /**
+ * @return all indexes which are marked as built and ready to use
+ */
+ public List<String> getBuiltIndexNames()
+ {
+ Set<String> allIndexNames = new HashSet<>();
+ indexes.values().stream()
+ .map(i -> i.getIndexMetadata().name)
+ .forEach(allIndexNames::add);
+ return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
+ }
+
+ /**
+ * @return all backing Tables used by registered indexes
+ */
+ public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores()
+ {
+ Set<ColumnFamilyStore> backingTables = new HashSet<>();
+ indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
+ return backingTables;
+ }
+
+ /**
+ * @return if there are ANY indexes registered for this table
+ */
+ public boolean hasIndexes()
+ {
+ return !indexes.isEmpty();
+ }
+
+ /**
+ * When building an index against existing data in sstables, add the given partition to the index
+ */
+ public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
+
+ if (!indexes.isEmpty())
+ {
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
+ FBUtilities.nowInSeconds(),
+ key);
+ int nowInSec = cmd.nowInSec();
+ boolean readStatic = false;
+
+ SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup readGroup = cmd.startOrderGroup();
+ OpOrder.Group writeGroup = Keyspace.writeOrder.start();
+ RowIterator partition =
+ PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup),
+ cmd))
+ {
+ Set<Index.Indexer> indexers = indexes.stream()
+ .map(index -> index.indexerFor(key,
+ partition.columns(),
+ nowInSec,
+ writeGroup,
+ IndexTransaction.Type.UPDATE))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ indexers.forEach(Index.Indexer::begin);
+
+ // only process the static row once per partition
+ if (!readStatic && !partition.staticRow().isEmpty())
+ {
+ indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+ readStatic = true;
+ }
+
+ while (partition.hasNext())
+ {
+ Row row = partition.next();
+ indexers.forEach(indexer -> indexer.insertRow(row));
+ }
+
+ indexers.forEach(Index.Indexer::finish);
+ }
+ }
+ }
+ }
+
+ /**
+ * Return the page size used when indexing an entire partition
+ */
+ public int calculateIndexingPageSize()
+ {
+ if (Boolean.getBoolean("cassandra.force_default_indexing_page_size"))
+ return DEFAULT_PAGE_SIZE;
+
+ double targetPageSizeInBytes = 32 * 1024 * 1024;
+ double meanPartitionSize = baseCfs.getMeanPartitionSize();
+ if (meanPartitionSize <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ int meanCellsPerPartition = baseCfs.getMeanColumns();
+ if (meanCellsPerPartition <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ int columnsPerRow = baseCfs.metadata.partitionColumns().regulars.size();
+ if (meanCellsPerPartition <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ int meanRowsPerPartition = meanCellsPerPartition / columnsPerRow;
+ double meanRowSize = meanPartitionSize / meanRowsPerPartition;
+
+ int pageSize = (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, targetPageSizeInBytes / meanRowSize));
+
+ logger.trace("Calculated page size {} for indexing {}.{} ({}/{}/{}/{})",
+ pageSize,
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ meanPartitionSize,
+ meanCellsPerPartition,
+ meanRowsPerPartition,
+ meanRowSize);
+
+ return pageSize;
+ }
+
+ /**
+ * Delete all data from all indexes for this partition.
+ * For when cleanup rips a partition out entirely.
+ *
+ * TODO : improve cleanup transaction to batch updates & perform them async
+ */
+ public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
+ {
+ // we need to acquire memtable lock because secondary index deletion may
+ // cause a race (see CASSANDRA-3712). This is done internally by the
+ // index transaction when it commits
+ CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(),
+ partition.columns(),
+ nowInSec);
+ indexTransaction.start();
+ indexTransaction.onPartitionDeletion(new DeletionTime(FBUtilities.timestampMicros(), nowInSec));
+ indexTransaction.commit();
+
+ while (partition.hasNext())
+ {
+ Unfiltered unfiltered = partition.next();
+ if (unfiltered.kind() != Unfiltered.Kind.ROW)
+ continue;
+
+ indexTransaction = newCleanupTransaction(partition.partitionKey(),
+ partition.columns(),
+ nowInSec);
+ indexTransaction.start();
+ indexTransaction.onRowDelete((Row)unfiltered);
+ indexTransaction.commit();
+ }
+ }
+
+ /**
+ * Called at query time to choose which (if any) of the registered index implementations to use for a given query.
+ *
+ * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
+ * the search space the most.
+ *
+ * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
+ * specify are automatically included. Following that, the registered indexes are filtered to include only those
+ * which support the standard expressions in the RowFilter.
+ *
+ * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
+ * method.
+ *
+ * Implementation specific validation of the target expression, either custom or standard, by the selected
+ * index should be performed in the searcherFor method to ensure that we pick the right index regardless of
+ * the validity of the expression.
+ *
+ * This method is only called once during the lifecycle of a ReadCommand and the result is
+ * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
+ * ReadOrderGroup, or an estimate of the result size from an average index query.
+ *
+ * @param command ReadCommand to be executed
+ * @return an Index instance, ready to use during execution of the command, or null if none
+ * of the registered indexes can support the command.
+ */
+ public Index getBestIndexFor(ReadCommand command)
+ {
+ if (indexes.isEmpty() || command.rowFilter().isEmpty())
+ return null;
+
+ Set<Index> searchableIndexes = new HashSet<>();
+ for (RowFilter.Expression expression : command.rowFilter())
+ {
+ if (expression.isCustom())
+ {
+ // Only a single custom expression is allowed per query and, if present,
+ // we want to always favour the index specified in such an expression
+ RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
+ logger.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
+ Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
+ return indexes.get(customExpression.getTargetIndex().name);
+ }
+ else
+ {
+ indexes.values().stream()
+ .filter(index -> index.supportsExpression(expression.column(), expression.operator()))
+ .forEach(searchableIndexes::add);
+ }
+ }
+
+ if (searchableIndexes.isEmpty())
+ {
+ logger.trace("No applicable indexes found");
+ Tracing.trace("No applicable indexes found");
+ return null;
+ }
+
+ Index selected = searchableIndexes.size() == 1
+ ? Iterables.getOnlyElement(searchableIndexes)
+ : searchableIndexes.stream()
+ .min((a, b) -> Longs.compare(a.getEstimatedResultRows(),
+ b.getEstimatedResultRows()))
+ .orElseThrow(() -> new AssertionError("Could not select most selective index"));
+
+ // pay for an additional threadlocal get() rather than build the strings unnecessarily
+ if (Tracing.isTracing())
+ {
+ Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
+ searchableIndexes.stream().map(i -> i.getIndexMetadata().name + ':' + i.getEstimatedResultRows())
+ .collect(Collectors.joining(",")),
+ selected.getIndexMetadata().name);
+ }
+ return selected;
+ }
+
+ /**
+ * Called at write time to ensure that values present in the update
+ * are valid according to the rules of all registered indexes which
+ * will process it. The partition key as well as the clustering and
+ * cell values for each row in the update may be checked by index
+ * implementations
+ * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+ * @throws InvalidRequestException
+ */
+ public void validate(PartitionUpdate update) throws InvalidRequestException
+ {
+ for (Index index : indexes.values())
+ index.validate(update);
+ }
+
+ /**
+ * IndexRegistry methods
+ */
+ public void registerIndex(Index index)
+ {
+ String name = index.getIndexMetadata().name;
+ indexes.put(name, index);
+ logger.trace("Registered index {}", name);
+ }
+
+ public void unregisterIndex(Index index)
+ {
+ unregisterIndex(index.getIndexMetadata().name);
+ }
+
+ private Index unregisterIndex(String name)
+ {
+ Index removed = indexes.remove(name);
+ builtIndexes.remove(name);
+ logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
+ name);
+ return removed;
+ }
+
+ public Index getIndex(IndexMetadata metadata)
+ {
+ return indexes.get(metadata.name);
+ }
+
+ public Collection<Index> listIndexes()
+ {
+ return ImmutableSet.copyOf(indexes.values());
+ }
+
+ /**
+ * Handling of index updates.
+ * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
+ * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
+ */
+
+ /**
+ * Transaction for updates on the write path.
+ */
+ public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
+ {
+ if (!hasIndexes())
+ return UpdateTransaction.NO_OP;
+
+ Index.Indexer[] indexers = indexes.values().stream()
+ .map(i -> i.indexerFor(update.partitionKey(),
+ update.columns(),
+ nowInSec,
+ opGroup,
+ IndexTransaction.Type.UPDATE))
+ .filter(Objects::nonNull)
+ .toArray(Index.Indexer[]::new);
+
+ return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
+ }
+
+ /**
+ * Transaction for use when merging rows during compaction
+ */
+ public CompactionTransaction newCompactionTransaction(DecoratedKey key,
+ PartitionColumns partitionColumns,
+ int versions,
+ int nowInSec)
+ {
+ // the check for whether there are any registered indexes is already done in CompactionIterator
+ return new IndexGCTransaction(key, partitionColumns, versions, nowInSec, listIndexes());
+ }
+
+ /**
+ * Transaction for use when removing partitions during cleanup
+ */
+ public CleanupTransaction newCleanupTransaction(DecoratedKey key,
+ PartitionColumns partitionColumns,
+ int nowInSec)
+ {
+ if (!hasIndexes())
+ return CleanupTransaction.NO_OP;
+
+ return new CleanupGCTransaction(key, partitionColumns, nowInSec, listIndexes());
+ }
+
+ /**
+ * A single use transaction for processing a partition update on the regular write path
+ */
+ private static final class WriteTimeTransaction implements UpdateTransaction
+ {
+ private final Index.Indexer[] indexers;
+
+ private WriteTimeTransaction(Index.Indexer...indexers)
+ {
+ // don't allow null indexers, if we don't need any use a NullUpdater object
+ for (Index.Indexer indexer : indexers) assert indexer != null;
+ this.indexers = indexers;
+ }
+
+ public void start()
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.begin();
+ }
+
+ public void onPartitionDeletion(DeletionTime deletionTime)
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.partitionDelete(deletionTime);
+ }
+
+ public void onRangeTombstone(RangeTombstone tombstone)
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.rangeTombstone(tombstone);
+ }
+
+ public void onInserted(Row row)
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.insertRow(row);
+ }
+
+ public void onUpdated(Row existing, Row updated)
+ {
+ final Row.Builder toRemove = BTreeRow.sortedBuilder();
+ toRemove.newRow(existing.clustering());
+ toRemove.addPrimaryKeyLivenessInfo(existing.primaryKeyLivenessInfo());
+ toRemove.addRowDeletion(existing.deletion());
+ final Row.Builder toInsert = BTreeRow.sortedBuilder();
+ toInsert.newRow(updated.clustering());
+ toInsert.addPrimaryKeyLivenessInfo(updated.primaryKeyLivenessInfo());
+ toInsert.addRowDeletion(updated.deletion());
+ // diff listener collates the columns to be added & removed from the indexes
+ RowDiffListener diffListener = new RowDiffListener()
+ {
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ }
+
+ public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
+ {
+ }
+
+ public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+ {
+ }
+
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+ {
+ if (merged != null && !merged.equals(original))
+ toInsert.addCell(merged);
+
+ if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
+ toRemove.addCell(original);
+
+ }
+ };
+ Rows.diff(diffListener, updated, existing);
+ Row oldRow = toRemove.build();
+ Row newRow = toInsert.build();
+ for (Index.Indexer indexer : indexers)
+ indexer.updateRow(oldRow, newRow);
+ }
+
+ public void commit()
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.finish();
+ }
+
+ private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
+ {
+ // If either the value or timestamp is different, then we
+ // should delete from the index. If not, then we can infer that
+ // at least one of the cells is an ExpiringColumn and that the
+ // difference is in the expiry time. In this case, we don't want to
+ // delete the old value from the index as the tombstone we insert
+ // will just hide the inserted value.
+ // Completely identical cells (including expiring columns with
+ // identical ttl & localExpirationTime) will not get this far due
+ // to the oldCell.equals(newCell) in StandardUpdater.update
+ return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
+ }
+ }
+
+ /**
+ * A single-use transaction for updating indexes for a single partition during compaction where the only
+ * operation is to merge rows
+ * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
+ * a single partition
+ */
+ private static final class IndexGCTransaction implements CompactionTransaction
+ {
+ private final DecoratedKey key;
+ private final PartitionColumns columns;
+ private final int versions;
+ private final int nowInSec;
+ private final Collection<Index> indexes;
+
+ private Row[] rows;
+
+ private IndexGCTransaction(DecoratedKey key,
+ PartitionColumns columns,
+ int versions,
+ int nowInSec,
+ Collection<Index> indexes)
+ {
+ this.key = key;
+ this.columns = columns;
+ this.versions = versions;
+ this.indexes = indexes;
+ this.nowInSec = nowInSec;
+ }
+
+ public void start()
+ {
+ if (versions > 0)
+ rows = new Row[versions];
+ }
+
+ public void onRowMerge(Row merged, Row...versions)
+ {
+ // Diff listener constructs rows representing deltas between the merged and original versions
+ // These delta rows are then passed to registered indexes for removal processing
+ final Row.Builder[] builders = new Row.Builder[versions.length];
+ RowDiffListener diffListener = new RowDiffListener()
+ {
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ if (original != null && (merged == null || !merged.isLive(nowInSec)))
+ getBuilder(i, clustering).addPrimaryKeyLivenessInfo(original);
+ }
+
+ public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
+ {
+ }
+
+ public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+ {
+ }
+
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+ {
+ if (original != null && (merged == null || !merged.isLive(nowInSec)))
+ getBuilder(i, clustering).addCell(original);
+ }
+
+ private Row.Builder getBuilder(int index, Clustering clustering)
+ {
+ if (builders[index] == null)
+ {
+ builders[index] = BTreeRow.sortedBuilder();
+ builders[index].newRow(clustering);
+ }
+ return builders[index];
+ }
+ };
+
+ Rows.diff(diffListener, merged, versions);
+
+ for(int i = 0; i < builders.length; i++)
+ if (builders[i] != null)
+ rows[i] = builders[i].build();
+ }
+
+ public void commit()
+ {
+ if (rows == null)
+ return;
+
+ try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+ {
+ for (Index index : indexes)
+ {
+ Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.COMPACTION);
+ if (indexer == null)
+ continue;
+
+ indexer.begin();
+ for (Row row : rows)
+ if (row != null)
+ indexer.removeRow(row);
+ indexer.finish();
+ }
+ }
+ }
+ }
+
+ /**
+ * A single-use transaction for updating indexes for a single partition during cleanup, where
+ * partitions and rows are only removed
+ * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
+ * a single partition
+ */
+ private static final class CleanupGCTransaction implements CleanupTransaction
+ {
+ private final DecoratedKey key;
+ private final PartitionColumns columns;
+ private final int nowInSec;
+ private final Collection<Index> indexes;
+
+ private Row row;
+ private DeletionTime partitionDelete;
+
+ private CleanupGCTransaction(DecoratedKey key,
+ PartitionColumns columns,
+ int nowInSec,
+ Collection<Index> indexes)
+ {
+ this.key = key;
+ this.columns = columns;
+ this.indexes = indexes;
+ this.nowInSec = nowInSec;
+ }
+
+ public void start()
+ {
+ }
+
+ public void onPartitionDeletion(DeletionTime deletionTime)
+ {
+ partitionDelete = deletionTime;
+ }
+
+ public void onRowDelete(Row row)
+ {
+ this.row = row;
+ }
+
+ public void commit()
+ {
+ if (row == null && partitionDelete == null)
+ return;
+
+ try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+ {
+ for (Index index : indexes)
+ {
+ Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.CLEANUP);
+ if (indexer == null)
+ continue;
+
+ indexer.begin();
+
+ if (partitionDelete != null)
+ indexer.partitionDelete(partitionDelete);
+
+ if (row != null)
+ indexer.removeRow(row);
+
+ indexer.finish();
+ }
+ }
+ }
+ }
+
+ private static void executeBlocking(Callable<?> task)
+ {
+ if (null != task)
+ FBUtilities.waitOnFuture(blockingExecutor.submit(task));
+ }
+
+ private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
+ {
+ List<Future<?>> waitFor = new ArrayList<>();
+ indexers.forEach(indexer -> {
+ Callable<?> task = function.apply(indexer);
+ if (null != task)
+ waitFor.add(blockingExecutor.submit(task));
+ });
+ FBUtilities.waitOnFutures(waitFor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index f0cdcf5,ba060d4..3283723
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -2183,10 -2246,10 +2183,10 @@@ public abstract class SSTableReader ext
// Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
// the read meter when in client mode.
- if (Schema.isSystemKeyspace(desc.ksname))
- if (SystemKeyspace.NAME.equals(desc.ksname) || !DatabaseDescriptor.isDaemonInitialized())
++ if (Schema.isSystemKeyspace(desc.ksname) || !DatabaseDescriptor.isDaemonInitialized())
{
readMeter = null;
- readMeterSyncFuture = null;
+ readMeterSyncFuture = NULL;
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index 3d45393,98b8e23..3d8d03b
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -140,86 -137,7 +140,87 @@@ public abstract class CQLTeste
// We don't use USE_PREPARED_VALUES in the code below so some test can foce value preparation (if the result
// is not expected to be the same without preparation)
private boolean usePrepared = USE_PREPARED_VALUES;
- private static final boolean reusePrepared = Boolean.valueOf(System.getProperty("cassandra.test.reuse_prepared", "true"));
+ private static boolean reusePrepared = REUSE_PREPARED;
+
+ public static void prepareServer()
+ {
+ if (isServerPrepared)
+ return;
+
+ // Cleanup first
+ try
+ {
+ cleanupAndLeaveDirs();
+ }
+ catch (IOException e)
+ {
+ logger.error("Failed to cleanup and recreate directories.");
+ throw new RuntimeException(e);
+ }
+
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+ {
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ logger.error("Fatal exception in thread " + t, e);
+ }
+ });
+
+ ThreadAwareSecurityManager.install();
+
++ DatabaseDescriptor.setDaemonInitialized();
+ Keyspace.setInitialized();
+ isServerPrepared = true;
+ }
+
+ public static void cleanupAndLeaveDirs() throws IOException
+ {
+ // We need to stop and unmap all CLS instances prior to cleanup() or we'll get failures on Windows.
+ CommitLog.instance.stopUnsafe(true);
+ mkdirs();
+ cleanup();
+ mkdirs();
+ CommitLog.instance.restartUnsafe();
+ }
+
+ public static void cleanup()
+ {
+ // clean up commitlog
+ String[] directoryNames = { DatabaseDescriptor.getCommitLogLocation(), };
+ for (String dirName : directoryNames)
+ {
+ File dir = new File(dirName);
+ if (!dir.exists())
+ throw new RuntimeException("No such directory: " + dir.getAbsolutePath());
+ FileUtils.deleteRecursive(dir);
+ }
+
+ cleanupSavedCaches();
+
+ // clean up data directory which are stored as data directory/keyspace/data files
+ for (String dirName : DatabaseDescriptor.getAllDataFileLocations())
+ {
+ File dir = new File(dirName);
+ if (!dir.exists())
+ throw new RuntimeException("No such directory: " + dir.getAbsolutePath());
+ FileUtils.deleteRecursive(dir);
+ }
+ }
+
+ public static void mkdirs()
+ {
+ DatabaseDescriptor.createAllDirectories();
+ }
+
+ public static void cleanupSavedCaches()
+ {
+ File cachesDir = new File(DatabaseDescriptor.getSavedCachesLocation());
+
+ if (!cachesDir.exists() || !cachesDir.isDirectory())
+ return;
+
+ FileUtils.delete(cachesDir.listFiles());
+ }
@BeforeClass
public static void setUpClass()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 557beba,5e2fffe..e6d18c4
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@@ -32,9 -32,9 +32,10 @@@ import org.junit.BeforeClass
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
index 5ac1b31,bc5be46..37b5fa9
--- a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
@@@ -28,7 -27,7 +28,8 @@@ import org.junit.BeforeClass
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
index 6f76db4,0000000..a2c9cf9
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
@@@ -1,56 -1,0 +1,63 @@@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.metrics;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.UUID;
+
++import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.hints.HintsService;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
+public class HintedHandOffMetricsTest
+{
++ @BeforeClass
++ public static void initDD()
++ {
++ DatabaseDescriptor.setDaemonInitialized();
++ }
++
+ @Test
+ public void testHintsMetrics() throws Exception
+ {
+ DatabaseDescriptor.getHintsDirectory().mkdirs();
+
+ for (int i = 0; i < 99; i++)
+ HintsService.instance.metrics.incrPastWindow(InetAddress.getLocalHost());
+ HintsService.instance.metrics.log();
+
+ UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS);
+ Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
+ assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
index f0c850d,4e160c2..bb2b9b0
--- a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
@@@ -18,14 -18,14 +18,21 @@@
*/
package org.apache.cassandra.utils.concurrent;
++import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import junit.framework.Assert;
++import org.apache.cassandra.config.DatabaseDescriptor;
@Ignore
public abstract class AbstractTransactionalTest
{
++ @BeforeClass
++ public static void setupDD()
++ {
++ DatabaseDescriptor.setDaemonInitialized();
++ }
protected abstract TestableTransaction newTest() throws Exception;