You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/22 22:13:15 UTC
[4/9] cassandra git commit: Merge branch cassandra-2.2 into
cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DigestResolver.java
index 572df6f,0000000..62b4538
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@@ -1,98 -1,0 +1,98 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.net.MessageIn;
+
+public class DigestResolver extends ResponseResolver
+{
+ private volatile ReadResponse dataResponse;
+
+ public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+ {
+ super(keyspace, command, consistency, maxResponseCount);
+ }
+
+ @Override
+ public void preprocess(MessageIn<ReadResponse> message)
+ {
+ super.preprocess(message);
+ if (dataResponse == null && !message.payload.isDigestResponse())
+ dataResponse = message.payload;
+ }
+
+ /**
+ * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
+ */
+ public PartitionIterator getData()
+ {
+ assert isDataPresent();
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
+ }
+
+ /*
+ * This method handles two different scenarios:
+ *
+ * a) we're handling the initial read of data from the closest replica + digests
+ * from the rest. In this case we check the digests against each other,
+ * throw an exception if there is a mismatch, otherwise return the data row.
+ *
+ * b) we're checking additional digests that arrived after the minimum to handle
+ * the requested ConsistencyLevel, i.e. asynchronous read repair check
+ */
+ public PartitionIterator resolve() throws DigestMismatchException
+ {
+ if (responses.size() == 1)
+ return getData();
+
- if (logger.isDebugEnabled())
- logger.debug("resolving {} responses", responses.size());
++ if (logger.isTraceEnabled())
++ logger.trace("resolving {} responses", responses.size());
+
+ long start = System.nanoTime();
+
+ // validate digests against each other; throw immediately on mismatch.
+ ByteBuffer digest = null;
+ for (MessageIn<ReadResponse> message : responses)
+ {
+ ReadResponse response = message.payload;
+
+ ByteBuffer newDigest = response.digest(command.metadata(), command);
+ if (digest == null)
+ digest = newDigest;
+ else if (!digest.equals(newDigest))
+ // rely on the fact that only single partition queries use digests
+ throw new DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), digest, newDigest);
+ }
+
- if (logger.isDebugEnabled())
- logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
++ if (logger.isTraceEnabled())
++ logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
+ return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
+ }
+
+ public boolean isDataPresent()
+ {
+ return dataResponse != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/GCInspector.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ReadCallback.java
index 8b1ef32,145679d..8747004
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@@ -108,46 -102,31 +108,46 @@@ public class ReadCallback implements IA
}
}
- public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
+ public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
- if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
+ boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
+ boolean failed = blockfor + failures > endpoints.size();
+ if (signaled && !failed)
+ return;
+
+ if (Tracing.isTracing())
{
- // Same as for writes, see AbstractWriteResponseHandler
- ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
- Tracing.trace("Read timeout: {}", ex.toString());
- if (logger.isTraceEnabled())
- logger.trace("Read timeout: {}", ex.toString());
- throw ex;
+ String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
+ Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
}
-
- if (blockfor + failures > endpoints.size())
+ else if (logger.isDebugEnabled())
{
- ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent());
-
- if (logger.isTraceEnabled())
- logger.trace("Read failure: {}", ex.toString());
- throw ex;
+ String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
+ logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
}
- return blockfor == 1 ? resolver.getData() : resolver.resolve();
+ // Same as for writes, see AbstractWriteResponseHandler
+ throw failed
+ ? new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent())
+ : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
}
- public void response(MessageIn<TMessage> message)
+ public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
+ {
+ awaitResults();
+
+ PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
- if (logger.isDebugEnabled())
- logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
++ if (logger.isTraceEnabled())
++ logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ return result;
+ }
+
+ public int blockFor()
+ {
+ return blockfor;
+ }
+
+ public void response(MessageIn<ReadResponse> message)
{
resolver.preprocess(message);
int n = waitingFor(message.from)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index d209af6,af56c3a..5c94f08
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -819,86 -703,59 +819,86 @@@ public class StorageProxy implements St
}
}
- private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
+ public static boolean canDoLocalRequest(InetAddress replica)
+ {
+ return replica.equals(FBUtilities.getBroadcastAddress());
+ }
+
+ private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
+ throws WriteTimeoutException, WriteFailureException
+ {
+ WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
+ Collections.<InetAddress>emptyList(),
+ endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
+ Keyspace.open(SystemKeyspace.NAME),
+ null,
+ WriteType.BATCH_LOG);
+
+ Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
+
+ if (!endpoints.current.isEmpty())
+ syncWriteToBatchlog(handler, batch, endpoints.current);
+
+ if (!endpoints.legacy.isEmpty())
+ LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy);
+
+ handler.get();
+ }
+
+ private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
throws WriteTimeoutException, WriteFailureException
{
- AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
- Collections.<InetAddress>emptyList(),
- ConsistencyLevel.ONE,
- Keyspace.open(SystemKeyspace.NAME),
- null,
- WriteType.BATCH_LOG);
+ MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
- MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
- .createMessage();
for (InetAddress target : endpoints)
{
- logger.debug("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
- int targetVersion = MessagingService.instance().getVersion(target);
- if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- {
- insertLocal(message.payload, handler);
- }
- else if (targetVersion == MessagingService.current_version)
- {
- MessagingService.instance().sendRR(message, target, handler, false);
- }
++ logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
+ if (canDoLocalRequest(target))
+ performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler);
else
- {
- MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
- .createMessage(),
- target,
- handler,
- false);
- }
+ MessagingService.instance().sendRR(message, target, handler);
}
+ }
- handler.get();
+ private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
+ {
+ if (!endpoints.current.isEmpty())
+ asyncRemoveFromBatchlog(endpoints.current, uuid);
+
+ if (!endpoints.legacy.isEmpty())
+ LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
}
private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
{
- AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
- Collections.<InetAddress>emptyList(),
- ConsistencyLevel.ANY,
- Keyspace.open(SystemKeyspace.NAME),
- null,
- WriteType.SIMPLE);
- Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid));
- mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
- MessageOut<Mutation> message = mutation.createMessage();
+ MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer);
for (InetAddress target : endpoints)
{
- if (logger.isDebugEnabled())
- logger.debug("Sending batchlog remove request {} to {}", uuid, target);
- if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
- insertLocal(message.payload, handler);
++ if (logger.isTraceEnabled())
++ logger.trace("Sending batchlog remove request {} to {}", uuid, target);
+
+ if (canDoLocalRequest(target))
+ performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid));
else
- MessagingService.instance().sendRR(message, target, handler, false);
+ MessagingService.instance().sendOneWay(message, target);
+ }
+ }
+
+ private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
+ {
+ for (WriteResponseHandlerWrapper wrapper : wrappers)
+ {
+ Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
+
+ try
+ {
+ sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage);
+ }
+ catch (OverloadedException | WriteTimeoutException e)
+ {
+ wrapper.handler.onFailure(FBUtilities.getBroadcastAddress());
+ }
}
}
@@@ -1522,144 -1347,218 +1522,144 @@@
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
- private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
+ private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel consistencyLevel)
throws UnavailableException, ReadFailureException, ReadTimeoutException
{
- List<Row> rows = new ArrayList<>(initialCommands.size());
- // (avoid allocating a new list in the common case of nothing-to-retry)
- List<ReadCommand> commandsToRetry = Collections.emptyList();
+ int cmdCount = commands.size();
- do
- {
- List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
- AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
+ SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
+ for (int i = 0; i < cmdCount; i++)
+ reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel);
- if (!commandsToRetry.isEmpty())
- Tracing.trace("Retrying {} commands", commandsToRetry.size());
+ for (int i = 0; i < cmdCount; i++)
+ reads[i].doInitialQueries();
- // send out read requests
- for (int i = 0; i < commands.size(); i++)
- {
- ReadCommand command = commands.get(i);
- assert !command.isDigestQuery();
+ for (int i = 0; i < cmdCount; i++)
+ reads[i].maybeTryAdditionalReplicas();
- AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
- exec.executeAsync();
- readExecutors[i] = exec;
- }
+ for (int i = 0; i < cmdCount; i++)
+ reads[i].awaitResultsAndRetryOnDigestMismatch();
- for (AbstractReadExecutor exec : readExecutors)
- exec.maybeTryAdditionalReplicas();
+ for (int i = 0; i < cmdCount; i++)
+ if (!reads[i].isDone())
+ reads[i].maybeAwaitFullDataRead();
- // read results and make a second pass for any digest mismatches
- List<ReadCommand> repairCommands = null;
- List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
- for (AbstractReadExecutor exec: readExecutors)
- {
- try
- {
- Row row = exec.get();
- if (row != null)
- {
- row = exec.command.maybeTrim(row);
- rows.add(row);
- }
+ List<PartitionIterator> results = new ArrayList<>(cmdCount);
+ for (int i = 0; i < cmdCount; i++)
+ {
+ assert reads[i].isDone();
+ results.add(reads[i].getResult());
+ }
- if (logger.isTraceEnabled())
- logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
- }
- catch (ReadTimeoutException|ReadFailureException ex)
- {
- int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
- int responseCount = exec.handler.getReceivedCount();
- String gotData = responseCount > 0
- ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
- : "";
-
- boolean isTimeout = ex instanceof ReadTimeoutException;
- if (Tracing.isTracing())
- {
- Tracing.trace("{}; received {} of {} responses{}",
- isTimeout ? "Timed out" : "Failed", responseCount, blockFor, gotData);
- }
- else if (logger.isDebugEnabled())
- {
- logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData);
- }
- throw ex;
- }
- catch (DigestMismatchException ex)
- {
- Tracing.trace("Digest mismatch: {}", ex);
+ return PartitionIterators.concat(results);
+ }
- ReadRepairMetrics.repairedBlocking.mark();
+ private static class SinglePartitionReadLifecycle
+ {
+ private final SinglePartitionReadCommand<?> command;
+ private final AbstractReadExecutor executor;
+ private final ConsistencyLevel consistency;
- // Do a full data read to resolve the correct response (and repair node that need be)
- RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size());
- ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver,
- ConsistencyLevel.ALL,
- exec.getContactedReplicas().size(),
- exec.command,
- Keyspace.open(exec.command.getKeyspace()),
- exec.handler.endpoints);
+ private PartitionIterator result;
+ private ReadCallback repairHandler;
- if (repairCommands == null)
- {
- repairCommands = new ArrayList<>();
- repairResponseHandlers = new ArrayList<>();
- }
- repairCommands.add(exec.command);
- repairResponseHandlers.add(repairHandler);
+ SinglePartitionReadLifecycle(SinglePartitionReadCommand<?> command, ConsistencyLevel consistency)
+ {
+ this.command = command;
+ this.executor = AbstractReadExecutor.getReadExecutor(command, consistency);
+ this.consistency = consistency;
+ }
- MessageOut<ReadCommand> message = exec.command.createMessage();
- for (InetAddress endpoint : exec.getContactedReplicas())
- {
- Tracing.trace("Enqueuing full data read to {}", endpoint);
- MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
- }
- }
- }
+ boolean isDone()
+ {
+ return result != null;
+ }
- commandsToRetry.clear();
+ void doInitialQueries()
+ {
+ executor.executeAsync();
+ }
- // read the results for the digest mismatch retries
- if (repairResponseHandlers != null)
- {
- for (int i = 0; i < repairCommands.size(); i++)
- {
- ReadCommand command = repairCommands.get(i);
- ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i);
+ void maybeTryAdditionalReplicas()
+ {
+ executor.maybeTryAdditionalReplicas();
+ }
- Row row;
- try
- {
- row = handler.get();
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // full data requested from each node here, no digests should be sent
- }
- catch (ReadTimeoutException e)
- {
- if (Tracing.isTracing())
- Tracing.trace("Timed out waiting on digest mismatch repair requests");
- else
- logger.trace("Timed out waiting on digest mismatch repair requests");
- // the caught exception here will have CL.ALL from the repair command,
- // not whatever CL the initial command was at (CASSANDRA-7947)
- int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
- throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
- }
+ void awaitResultsAndRetryOnDigestMismatch() throws ReadFailureException, ReadTimeoutException
+ {
+ try
+ {
+ result = executor.get();
+ }
+ catch (DigestMismatchException ex)
+ {
+ Tracing.trace("Digest mismatch: {}", ex);
- RowDataResolver resolver = (RowDataResolver)handler.resolver;
- try
- {
- // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
- // behind on writes in case the out-of-sync row is read multiple times in quick succession
- FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
- }
- catch (TimeoutException e)
- {
- if (Tracing.isTracing())
- Tracing.trace("Timed out waiting on digest mismatch repair acknowledgements");
- else
- logger.trace("Timed out waiting on digest mismatch repair acknowledgements");
- int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
- throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
- }
+ ReadRepairMetrics.repairedBlocking.mark();
- // retry any potential short reads
- ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
- if (retryCommand != null)
- {
- Tracing.trace("Issuing retry for read command");
- if (commandsToRetry == Collections.EMPTY_LIST)
- commandsToRetry = new ArrayList<>();
- commandsToRetry.add(retryCommand);
- continue;
- }
+ // Do a full data read to resolve the correct response (and repair node that need be)
+ Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+ DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size());
+ repairHandler = new ReadCallback(resolver,
+ ConsistencyLevel.ALL,
+ executor.getContactedReplicas().size(),
+ command,
+ keyspace,
+ executor.handler.endpoints);
- if (row != null)
- {
- row = command.maybeTrim(row);
- rows.add(row);
- }
+ for (InetAddress endpoint : executor.getContactedReplicas())
+ {
+ MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
+ Tracing.trace("Enqueuing full data read to {}", endpoint);
+ MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
}
}
- } while (!commandsToRetry.isEmpty());
-
- return rows;
- }
-
- static class LocalReadRunnable extends DroppableRunnable
- {
- private final ReadCommand command;
- private final ReadCallback<ReadResponse, Row> handler;
- private final long start = System.nanoTime();
-
- LocalReadRunnable(ReadCommand command, ReadCallback<ReadResponse, Row> handler)
- {
- super(MessagingService.Verb.READ);
- this.command = command;
- this.handler = handler;
}
- protected void runMayThrow()
+ void maybeAwaitFullDataRead() throws ReadTimeoutException
{
+ // There wasn't a digest mismatch, we're good
+ if (repairHandler == null)
+ return;
+
+ // Otherwise, get the result from the full-data read and check that it's not a short read
try
{
- Keyspace keyspace = Keyspace.open(command.ksName);
- Row r = command.getRow(keyspace);
- ReadResponse result = ReadVerbHandler.getResponse(command, r);
- MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
- handler.response(result);
+ result = repairHandler.get();
}
- catch (Throwable t)
+ catch (DigestMismatchException e)
{
- handler.onFailure(FBUtilities.getBroadcastAddress());
- if (t instanceof TombstoneOverwhelmingException)
- logger.error(t.getMessage());
+ throw new AssertionError(e); // full data requested from each node here, no digests should be sent
+ }
+ catch (ReadTimeoutException e)
+ {
+ if (Tracing.isTracing())
+ Tracing.trace("Timed out waiting on digest mismatch repair requests");
else
- logger.debug("Timed out waiting on digest mismatch repair requests");
- throw t;
++ logger.trace("Timed out waiting on digest mismatch repair requests");
+ // the caught exception here will have CL.ALL from the repair command,
+ // not whatever CL the initial command was at (CASSANDRA-7947)
+ int blockFor = consistency.blockFor(Keyspace.open(command.metadata().ksName));
+ throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
}
}
+
+ PartitionIterator getResult()
+ {
+ assert result != null;
+ return result;
+ }
}
- static class LocalRangeSliceRunnable extends DroppableRunnable
+ static class LocalReadRunnable extends DroppableRunnable
{
- private final AbstractRangeCommand command;
- private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
+ private final ReadCommand command;
+ private final ReadCallback handler;
private final long start = System.nanoTime();
- LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+ LocalReadRunnable(ReadCommand command, ReadCallback handler)
{
- super(MessagingService.Verb.RANGE_SLICE);
+ super(MessagingService.Verb.READ);
this.command = command;
this.handler = handler;
}
@@@ -1832,199 -1661,253 +1832,199 @@@
}
}
- public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
- throws UnavailableException, ReadFailureException, ReadTimeoutException
+ private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
{
- Tracing.trace("Computing ranges to query");
- long startTime = System.nanoTime();
+ private final ReadCallback handler;
+ private PartitionIterator result;
- Keyspace keyspace = Keyspace.open(command.keyspace);
- List<Row> rows;
- // now scan until we have enough results
- try
+ private SingleRangeResponse(ReadCallback handler)
{
- int liveRowCount = 0;
- boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
- rows = new ArrayList<>();
+ this.handler = handler;
+ }
- // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
- // expensive in clusters with vnodes)
- List<? extends AbstractBounds<RowPosition>> ranges;
- if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
- ranges = command.keyRange.unwrap();
- else
- ranges = getRestrictedRanges(command.keyRange);
+ private void waitForResponse() throws ReadTimeoutException
+ {
+ if (result != null)
+ return;
- // determine the number of rows to be fetched and the concurrency factor
- int rowsToBeFetched = command.limit();
- int concurrencyFactor;
- if (command.requiresScanningAllRanges())
+ try
{
- // all nodes must be queried
- rowsToBeFetched *= ranges.size();
- concurrencyFactor = ranges.size();
- logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
- command.limit(),
- ranges.size(),
- concurrencyFactor);
- Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}",
- ranges.size(), concurrencyFactor);
+ result = handler.get();
}
- else
+ catch (DigestMismatchException e)
{
- // our estimate of how many result rows there will be per-range
- float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
- // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
- // fetch enough rows in the first round
- resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
- concurrencyFactor = resultRowsPerRange == 0.0
- ? 1
- : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange)));
-
- logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
- resultRowsPerRange,
- command.limit(),
- ranges.size(),
- concurrencyFactor);
- Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
- ranges.size(),
- concurrencyFactor,
- resultRowsPerRange);
+ throw new AssertionError(e); // no digests in range slices yet
}
+ }
+
+ protected RowIterator computeNext()
+ {
+ waitForResponse();
+ return result.hasNext() ? result.next() : endOfData();
+ }
+
+ public void close()
+ {
+ if (result != null)
+ result.close();
+ }
+ }
+
+ private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+ {
+ private final Iterator<RangeForQuery> ranges;
+ private final int totalRangeCount;
+ private final PartitionRangeReadCommand command;
+ private final Keyspace keyspace;
+ private final ConsistencyLevel consistency;
+
+ private final long startTime;
+ private CountingPartitionIterator sentQueryIterator;
+
+ private int concurrencyFactor;
+ // The two following "metric" are maintained to improve the concurrencyFactor
+ // when it was not good enough initially.
+ private int liveReturned;
+ private int rangesQueried;
+
+ public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+ {
+ this.command = command;
+ this.concurrencyFactor = concurrencyFactor;
+ this.startTime = System.nanoTime();
+ this.ranges = new RangeMerger(ranges, keyspace, consistency);
+ this.totalRangeCount = ranges.rangeCount();
+ this.consistency = consistency;
+ this.keyspace = keyspace;
+ }
- boolean haveSufficientRows = false;
- int i = 0;
- AbstractBounds<RowPosition> nextRange = null;
- List<InetAddress> nextEndpoints = null;
- List<InetAddress> nextFilteredEndpoints = null;
- while (i < ranges.size())
+ public RowIterator computeNext()
+ {
+ while (sentQueryIterator == null || !sentQueryIterator.hasNext())
{
- List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
- int concurrentFetchStartingIndex = i;
- int concurrentRequests = 0;
- while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
+ // If we don't have more range to handle, we're done
+ if (!ranges.hasNext())
+ return endOfData();
+
+ // else, sends the next batch of concurrent queries (after having close the previous iterator)
+ if (sentQueryIterator != null)
{
- AbstractBounds<RowPosition> range = nextRange == null
- ? ranges.get(i)
- : nextRange;
- List<InetAddress> liveEndpoints = nextEndpoints == null
- ? getLiveSortedEndpoints(keyspace, range.right)
- : nextEndpoints;
- List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
- ? consistency_level.filterForQuery(keyspace, liveEndpoints)
- : nextFilteredEndpoints;
- ++i;
- ++concurrentRequests;
-
- // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
- // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
- // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
- while (i < ranges.size())
- {
- nextRange = ranges.get(i);
- nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
- nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
-
- // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
- // don't know how to deal with a wrapping range.
- // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
- // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
- // wire compatibility, so It's likely easier not to bother;
- if (range.right.isMinimum())
- break;
-
- List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
-
- // Check if there is enough endpoint for the merge to be possible.
- if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
- break;
-
- List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
-
- // Estimate whether merging will be a win or not
- if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
- break;
-
- // If we get there, merge this range and the next one
- range = range.withNewRight(nextRange.right);
- liveEndpoints = merged;
- filteredEndpoints = filteredMerged;
- ++i;
- }
+ liveReturned += sentQueryIterator.counter().counted();
+ sentQueryIterator.close();
- AbstractRangeCommand nodeCmd = command.forSubRange(range);
-
- // collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
- List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
- ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
- handler.assureSufficientLiveNodes();
- resolver.setSources(filteredEndpoints);
- if (filteredEndpoints.size() == 1
- && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
- && OPTIMIZE_LOCAL_REQUESTS)
- {
- StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
- }
- else
- {
- MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
- for (InetAddress endpoint : filteredEndpoints)
- {
- Tracing.trace("Enqueuing request to {}", endpoint);
- MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
- }
- }
- scanHandlers.add(Pair.create(nodeCmd, handler));
+ // It's not the first batch of queries and we're not done, so we we can use what has been
+ // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
+ updateConcurrencyFactor();
}
- Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
+ sentQueryIterator = sendNextRequests();
+ }
- List<AsyncOneResponse> repairResponses = new ArrayList<>();
- for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
- {
- ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
- RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
+ return sentQueryIterator.next();
+ }
- try
- {
- for (Row row : handler.get())
- {
- rows.add(row);
- if (countLiveRows)
- liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
- }
- repairResponses.addAll(resolver.repairResults);
- }
- catch (ReadTimeoutException|ReadFailureException ex)
- {
- // we timed out or failed waiting for responses
- int blockFor = consistency_level.blockFor(keyspace);
- int responseCount = resolver.responses.size();
- String gotData = responseCount > 0
- ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
- : "";
-
- boolean isTimeout = ex instanceof ReadTimeoutException;
- if (Tracing.isTracing())
- {
- Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
- (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size());
- }
- else if (logger.isDebugEnabled())
- {
- logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
- (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
- }
- throw ex;
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // no digests in range slices yet
- }
+ private void updateConcurrencyFactor()
+ {
+ if (liveReturned == 0)
+ {
+ // we haven't actually gotten any results, so query all remaining ranges at once
+ concurrencyFactor = totalRangeCount - rangesQueried;
+ return;
+ }
- // if we're done, great, otherwise, move to the next range
- int count = countLiveRows ? liveRowCount : rows.size();
- if (count >= rowsToBeFetched)
- {
- haveSufficientRows = true;
- break;
- }
- }
+ // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
+ // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
+ int remainingRows = command.limits().count() - liveReturned;
+ float rowsPerRange = (float)liveReturned / (float)rangesQueried;
+ concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
- logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
++ logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
+ rowsPerRange, (int) remainingRows, concurrencyFactor);
+ }
- try
- {
- FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
- }
- catch (TimeoutException ex)
- {
- // We got all responses, but timed out while repairing
- int blockFor = consistency_level.blockFor(keyspace);
- if (Tracing.isTracing())
- Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
- else
- logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
- }
+ private SingleRangeResponse query(RangeForQuery toQuery)
+ {
+ PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
+
+ DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
+
+ int blockFor = consistency.blockFor(keyspace);
+ int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
+ List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
+ ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
- if (haveSufficientRows)
- return command.postReconciliationProcessing(rows);
+ handler.assureSufficientLiveNodes();
- // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
- // based on the results we've seen so far (as long as we still have ranges left to query)
- if (i < ranges.size())
+ if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
+ {
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler), Tracing.instance.get());
+ }
+ else
+ {
+ for (InetAddress endpoint : toQuery.filteredEndpoints)
{
- float fetchedRows = countLiveRows ? liveRowCount : rows.size();
- float remainingRows = rowsToBeFetched - fetchedRows;
- float actualRowsPerRange;
- if (fetchedRows == 0.0)
- {
- // we haven't actually gotten any results, so query all remaining ranges at once
- actualRowsPerRange = 0.0f;
- concurrencyFactor = ranges.size() - i;
- }
- else
- {
- actualRowsPerRange = fetchedRows / i;
- concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
- }
- logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
- actualRowsPerRange, (int) remainingRows, concurrencyFactor);
+ MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
+ Tracing.trace("Enqueuing request to {}", endpoint);
+ MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
}
+
+ return new SingleRangeResponse(handler);
}
- finally
+
+ private CountingPartitionIterator sendNextRequests()
+ {
+ List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
+ for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+ {
+ concurrentQueries.add(query(ranges.next()));
+ ++rangesQueried;
+ }
+
+ Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
+ // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
+ // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
+ return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), DataLimits.NONE, command.nowInSec());
+ }
+
+ public void close()
{
- long latency = System.nanoTime() - startTime;
- rangeMetrics.addNano(latency);
- Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+ try
+ {
+ if (sentQueryIterator != null)
+ sentQueryIterator.close();
+ }
+ finally
+ {
+ long latency = System.nanoTime() - startTime;
+ rangeMetrics.addNano(latency);
+ Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+ }
}
- return command.postReconciliationProcessing(rows);
+ }
+
+ @SuppressWarnings("resource")
+ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
+ throws UnavailableException, ReadFailureException, ReadTimeoutException
+ {
+ Tracing.trace("Computing ranges to query");
+
+ Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+ RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
+
+ // our estimate of how many result rows there will be per-range
+ float resultsPerRange = estimateResultsPerRange(command, keyspace);
+ // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
+ // fetch enough rows in the first round
+ resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+ int concurrencyFactor = resultsPerRange == 0.0
+ ? 1
+ : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
- logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
++ logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
+ resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
+ Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
+
+ // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
+
+ return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
}
public Map<String, List<String>> getSchemaVersions()
@@@ -2430,52 -2287,6 +2430,52 @@@
logger.warn("Some hints were not written before shutdown. This is not supposed to happen. You should (a) run repair, and (b) file a bug report");
}
+ private static AtomicInteger getHintsInProgressFor(InetAddress destination)
+ {
+ try
+ {
+ return hintsInProgress.load(destination);
+ }
+ catch (Exception e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public static Future<Void> submitHint(Mutation mutation, InetAddress target, AbstractWriteResponseHandler<IMutation> responseHandler)
+ {
+ return submitHint(mutation, Collections.singleton(target), responseHandler);
+ }
+
+ public static Future<Void> submitHint(Mutation mutation,
+ Collection<InetAddress> targets,
+ AbstractWriteResponseHandler<IMutation> responseHandler)
+ {
+ HintRunnable runnable = new HintRunnable(targets)
+ {
+ public void runMayThrow()
+ {
- logger.debug("Adding hints for {}", targets);
++ logger.trace("Adding hints for {}", targets);
+ HintsService.instance.write(Iterables.transform(targets, StorageService.instance::getHostIdForEndpoint),
+ Hint.create(mutation, System.currentTimeMillis()));
+ targets.forEach(HintsService.instance.metrics::incrCreatedHints);
+ // Notify the handler only for CL == ANY
+ if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
+ responseHandler.response(null);
+ }
+ };
+
+ return submitHint(runnable);
+ }
+
+ private static Future<Void> submitHint(HintRunnable runnable)
+ {
+ StorageMetrics.totalHintsInProgress.inc(runnable.targets.size());
+ for (InetAddress target : runnable.targets)
+ getHintsInProgressFor(target).incrementAndGet();
+ return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
+ }
+
public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); }
public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java
index 7017bc1,1abc928..538d128
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@@ -688,10 -538,10 +688,10 @@@ public class CassandraServer implement
// request by page if this is a large row
if (cfs.getMeanColumns() > 0)
{
- int averageColumnSize = (int) (cfs.metric.meanRowSize.getValue() / cfs.getMeanColumns());
+ int averageColumnSize = (int) (cfs.metric.meanPartitionSize.getValue() / cfs.getMeanColumns());
pageSize = Math.min(COUNT_PAGE_SIZE, 4 * 1024 * 1024 / averageColumnSize);
pageSize = Math.max(2, pageSize);
- logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+ logger.trace("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
}
else
{
@@@ -1866,9 -1492,10 +1866,9 @@@
requestScheduler.release();
}
- public String system_add_column_family(CfDef cf_def)
- throws InvalidRequestException, SchemaDisagreementException, TException
+ public String system_add_column_family(CfDef cf_def) throws TException
{
- logger.debug("add_column_family");
+ logger.trace("add_column_family");
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------