You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/01/13 20:48:59 UTC
[08/13] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94e7ef17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94e7ef17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94e7ef17
Branch: refs/heads/cassandra-3.0
Commit: 94e7ef17757235cb35df70ff9a2a63e1d29d6c41
Parents: 0f995a2 dbf6e62
Author: Carl Yeksigian <ca...@apache.org>
Authored: Wed Jan 13 14:45:13 2016 -0500
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Wed Jan 13 14:45:13 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../AbstractLocalAwareExecutorService.java | 230 +++++++++++++++++++
.../AbstractTracingAwareExecutorService.java | 230 -------------------
.../DebuggableThreadPoolExecutor.java | 48 ++--
.../cassandra/concurrent/ExecutorLocal.java | 44 ++++
.../cassandra/concurrent/ExecutorLocals.java | 84 +++++++
.../concurrent/LocalAwareExecutorService.java | 34 +++
.../cassandra/concurrent/SEPExecutor.java | 3 +-
.../concurrent/SharedExecutorPool.java | 2 +-
.../cassandra/concurrent/StageManager.java | 12 +-
.../concurrent/TracingAwareExecutorService.java | 36 ---
.../cassandra/cql3/functions/UDFunction.java | 2 +-
.../cql3/statements/BatchStatement.java | 9 +-
.../cql3/statements/CreateViewStatement.java | 2 +-
.../cql3/statements/SelectStatement.java | 4 +-
.../org/apache/cassandra/db/ReadCommand.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 7 +-
.../apache/cassandra/service/ClientWarn.java | 62 +++--
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../org/apache/cassandra/tracing/Tracing.java | 3 +-
.../org/apache/cassandra/transport/Message.java | 6 +-
.../transport/RequestThreadPoolExecutor.java | 4 +-
.../cql3/validation/entities/UFTest.java | 6 +-
.../cassandra/service/ClientWarningsTest.java | 58 +++++
24 files changed, 545 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 614d5b4,6530956..a37ec99
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
-2.2.5
+3.0.3
+ * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
+ * Fix UnsupportedOperationException when reading old sstable with range
+ tombstone (CASSANDRA-10743)
+ * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
+ * Fix potential assertion error during compaction (CASSANDRA-10944)
+ * Fix counting of received sstables in streaming (CASSANDRA-10949)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
+ * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
+ * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
+ * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
+ * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
+ * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
+ * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
+Merged from 2.2:
+ * Make sure client gets tombstone overwhelmed warning (CASSANDRA-9465)
* Fix error streaming section more than 2GB (CASSANDRA-10961)
* (cqlsh) Also apply --connect-timeout to control connection
timeout (CASSANDRA-10959)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
index 0000000,088b43e..f47d8ac
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
+++ b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
@@@ -1,0 -1,229 +1,230 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.concurrent;
+
+ import java.util.Collection;
+ import java.util.List;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.TimeoutException;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import org.apache.cassandra.tracing.TraceState;
+ import org.apache.cassandra.tracing.Tracing;
+ import org.apache.cassandra.utils.concurrent.SimpleCondition;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+
+ import static org.apache.cassandra.tracing.Tracing.isTracing;
+
+ public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService
+ {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractLocalAwareExecutorService.class);
+
+ protected abstract void addTask(FutureTask<?> futureTask);
+ protected abstract void onCompletion();
+
+ /** Task Submission / Creation / Objects **/
+
+ public <T> FutureTask<T> submit(Callable<T> task)
+ {
+ return submit(newTaskFor(task));
+ }
+
+ public FutureTask<?> submit(Runnable task)
+ {
+ return submit(newTaskFor(task, null));
+ }
+
+ public <T> FutureTask<T> submit(Runnable task, T result)
+ {
+ return submit(newTaskFor(task, result));
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
+ {
+ return newTaskFor(runnable, result, ExecutorLocals.create());
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals)
+ {
+ if (locals != null)
+ {
+ if (runnable instanceof LocalSessionFutureTask)
+ return (LocalSessionFutureTask<T>) runnable;
+ return new LocalSessionFutureTask<T>(runnable, result, locals);
+ }
+ if (runnable instanceof FutureTask)
+ return (FutureTask<T>) runnable;
+ return new FutureTask<>(runnable, result);
+ }
+
+ protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
+ {
+ if (isTracing())
+ {
+ if (callable instanceof LocalSessionFutureTask)
+ return (LocalSessionFutureTask<T>) callable;
+ return new LocalSessionFutureTask<T>(callable, ExecutorLocals.create());
+ }
+ if (callable instanceof FutureTask)
+ return (FutureTask<T>) callable;
+ return new FutureTask<>(callable);
+ }
+
+ private class LocalSessionFutureTask<T> extends FutureTask<T>
+ {
+ private final ExecutorLocals locals;
+
+ public LocalSessionFutureTask(Callable<T> callable, ExecutorLocals locals)
+ {
+ super(callable);
+ this.locals = locals;
+ }
+
+ public LocalSessionFutureTask(Runnable runnable, T result, ExecutorLocals locals)
+ {
+ super(runnable, result);
+ this.locals = locals;
+ }
+
+ public void run()
+ {
+ ExecutorLocals old = ExecutorLocals.create();
+ ExecutorLocals.set(locals);
+ try
+ {
+ super.run();
+ }
+ finally
+ {
+ ExecutorLocals.set(old);
+ }
+ }
+ }
+
+ class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
+ {
+ private boolean failure;
+ private Object result = this;
+ private final Callable<T> callable;
+
+ public FutureTask(Callable<T> callable)
+ {
+ this.callable = callable;
+ }
+ public FutureTask(Runnable runnable, T result)
+ {
+ this(Executors.callable(runnable, result));
+ }
+
+ public void run()
+ {
+ try
+ {
+ result = callable.call();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
+ result = t;
+ failure = true;
+ }
+ finally
+ {
+ signalAll();
+ onCompletion();
+ }
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ public boolean isDone()
+ {
+ return isSignaled();
+ }
+
+ public T get() throws InterruptedException, ExecutionException
+ {
+ await();
+ Object result = this.result;
+ if (failure)
+ throw new ExecutionException((Throwable) result);
+ return (T) result;
+ }
+
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
- await(timeout, unit);
++ if (!await(timeout, unit))
++ throw new TimeoutException();
+ Object result = this.result;
+ if (failure)
+ throw new ExecutionException((Throwable) result);
+ return (T) result;
+ }
+ }
+
+ private <T> FutureTask<T> submit(FutureTask<T> task)
+ {
+ addTask(task);
+ return task;
+ }
+
+ public void execute(Runnable command)
+ {
+ addTask(newTaskFor(command, ExecutorLocals.create()));
+ }
+
+ public void execute(Runnable command, ExecutorLocals locals)
+ {
+ addTask(newTaskFor(command, null, locals));
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 04a4c3d,1e5cea6..fa0d306
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@@ -269,164 -143,11 +269,164 @@@ public abstract class UDFunction extend
return null;
long tStart = System.nanoTime();
- ByteBuffer result = executeUserDefined(protocolVersion, parameters);
- Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
- return result;
+ parameters = makeEmptyParametersNull(parameters);
+
+ try
+ {
+ // Using async UDF execution is expensive (adds about 100us overhead per invocation on a Core-i7 MBPr).
+ ByteBuffer result = DatabaseDescriptor.enableUserDefinedFunctionsThreads()
+ ? executeAsync(protocolVersion, parameters)
+ : executeUserDefined(protocolVersion, parameters);
+
+ Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
+ return result;
+ }
+ catch (InvalidRequestException e)
+ {
+ throw e;
+ }
+ catch (Throwable t)
+ {
+ logger.trace("Invocation of user-defined function '{}' failed", this, t);
+ if (t instanceof VirtualMachineError)
+ throw (VirtualMachineError) t;
+ throw FunctionExecutionException.create(this, t);
+ }
}
+ public static void assertUdfsEnabled(String language)
+ {
+ if (!DatabaseDescriptor.enableUserDefinedFunctions())
+ throw new InvalidRequestException("User-defined functions are disabled in cassandra.yaml - set enable_user_defined_functions=true to enable");
+ if (!"java".equalsIgnoreCase(language) && !DatabaseDescriptor.enableScriptedUserDefinedFunctions())
+ throw new InvalidRequestException("Scripted user-defined functions are disabled in cassandra.yaml - set enable_scripted_user_defined_functions=true to enable if you are aware of the security risks");
+ }
+
+ static void initializeThread()
+ {
+ // Get the TypeCodec stuff in Java Driver initialized.
+ // This is to get the classes loaded outside of the restricted sandbox's security context of a UDF.
+ UDHelper.codecFor(DataType.inet()).format(InetAddress.getLoopbackAddress());
+ UDHelper.codecFor(DataType.ascii()).format("");
+ }
+
+ private static final class ThreadIdAndCpuTime extends CompletableFuture<Object>
+ {
+ long threadId;
+ long cpuTime;
+
+ ThreadIdAndCpuTime()
+ {
+ // Looks weird?
+ // This call "just" links this class to java.lang.management - otherwise UDFs (script UDFs) might fail due to
+ // java.security.AccessControlException: access denied: ("java.lang.RuntimePermission" "accessClassInPackage.java.lang.management")
+ // because class loading would be deferred until setup() is executed - but setup() is called with
+ // limited privileges.
+ threadMXBean.getCurrentThreadCpuTime();
+ }
+
+ void setup()
+ {
+ this.threadId = Thread.currentThread().getId();
+ this.cpuTime = threadMXBean.getCurrentThreadCpuTime();
+ complete(null);
+ }
+ }
+
+ private ByteBuffer executeAsync(int protocolVersion, List<ByteBuffer> parameters)
+ {
+ ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
+
+ Future<ByteBuffer> future = executor().submit(() -> {
+ threadIdAndCpuTime.setup();
+ return executeUserDefined(protocolVersion, parameters);
+ });
+
+ try
+ {
+ if (DatabaseDescriptor.getUserDefinedFunctionWarnTimeout() > 0)
+ try
+ {
+ return future.get(DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+
+ // log and emit a warning that UDF execution took long
+ String warn = String.format("User defined function %s ran longer than %dms", this, DatabaseDescriptor.getUserDefinedFunctionWarnTimeout());
+ logger.warn(warn);
- ClientWarn.warn(warn);
++ ClientWarn.instance.warn(warn);
+ }
+
+ // retry with difference of warn-timeout to fail-timeout
+ return future.get(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable c = e.getCause();
+ if (c instanceof RuntimeException)
+ throw (RuntimeException) c;
+ throw new RuntimeException(c);
+ }
+ catch (TimeoutException e)
+ {
+ // retry a last time with the difference of UDF-fail-timeout to consumed CPU time (just in case execution hit a badly timed GC)
+ try
+ {
+ //The threadIdAndCpuTime shouldn't take a long time to be set so this should return immediately
+ threadIdAndCpuTime.get(1, TimeUnit.SECONDS);
+
+ long cpuTimeMillis = threadMXBean.getThreadCpuTime(threadIdAndCpuTime.threadId) - threadIdAndCpuTime.cpuTime;
+ cpuTimeMillis /= 1000000L;
+
+ return future.get(Math.max(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - cpuTimeMillis, 0L),
+ TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e1)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e1)
+ {
+ Throwable c = e.getCause();
+ if (c instanceof RuntimeException)
+ throw (RuntimeException) c;
+ throw new RuntimeException(c);
+ }
+ catch (TimeoutException e1)
+ {
+ TimeoutException cause = new TimeoutException(String.format("User defined function %s ran longer than %dms%s",
+ this,
+ DatabaseDescriptor.getUserDefinedFunctionFailTimeout(),
+ DatabaseDescriptor.getUserFunctionTimeoutPolicy() == Config.UserFunctionTimeoutPolicy.ignore
+ ? "" : " - will stop Cassandra VM"));
+ FunctionExecutionException fe = FunctionExecutionException.create(this, cause);
+ JVMStabilityInspector.userFunctionTimeout(cause);
+ throw fe;
+ }
+ }
+ }
+
+ private List<ByteBuffer> makeEmptyParametersNull(List<ByteBuffer> parameters)
+ {
+ List<ByteBuffer> r = new ArrayList<>(parameters.size());
+ for (int i = 0; i < parameters.size(); i++)
+ {
+ ByteBuffer param = parameters.get(i);
+ r.add(UDHelper.isNullOrEmpty(argTypes.get(i), param)
+ ? null : param);
+ }
+ return r;
+ }
+
+ protected abstract ExecutorService executor();
+
public boolean isCallableWrtNullable(List<ByteBuffer> parameters)
{
if (!calledOnNullInput)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 3979597,a289ad1..47396fb
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -226,35 -179,67 +226,35 @@@ public class BatchStatement implements
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
+ if (isLogged() && statement.cfm.params.gcGraceSeconds == 0)
+ {
+ if (tablesWithZeroGcGs == null)
+ tablesWithZeroGcGs = new HashSet<>();
+ tablesWithZeroGcGs.add(String.format("%s.%s", statement.cfm.ksName, statement.cfm.cfName));
+ }
QueryOptions statementOptions = options.forStatement(i);
long timestamp = attrs.getTimestamp(now, statementOptions);
- addStatementMutations(statement, statementOptions, local, timestamp, mutations);
+ statement.addUpdates(collector, statementOptions, local, timestamp);
}
- return unzipMutations(mutations);
- }
-
- private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
- {
-
- // The case where all statement where on the same keyspace is pretty common
- if (mutations.size() == 1)
- return mutations.values().iterator().next().values();
-
- List<IMutation> ms = new ArrayList<>();
- for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
- ms.addAll(ksMap.values());
-
- return ms;
- }
-
- private void addStatementMutations(ModificationStatement statement,
- QueryOptions options,
- boolean local,
- long now,
- Map<String, Map<ByteBuffer, IMutation>> mutations)
- throws RequestExecutionException, RequestValidationException
- {
- String ksName = statement.keyspace();
- Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
- if (ksMap == null)
+ if (tablesWithZeroGcGs != null)
{
- ksMap = new HashMap<>();
- mutations.put(ksName, ksMap);
+ String suffix = tablesWithZeroGcGs.size() == 1 ? "" : "s";
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, LOGGED_BATCH_LOW_GCGS_WARNING,
+ suffix, tablesWithZeroGcGs);
- ClientWarn.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
- .getMessage());
++ ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
++ .getMessage());
}
- // The following does the same than statement.getMutations(), but we inline it here because
- // we don't want to recreate mutations every time as this is particularly inefficient when applying
- // multiple batch to the same partition (see #6737).
- List<ByteBuffer> keys = statement.buildPartitionKeyNames(options);
- Composite clusteringPrefix = statement.createClusteringPrefix(options);
- UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now);
-
- for (ByteBuffer key : keys)
- {
- IMutation mutation = ksMap.get(key);
- Mutation mut;
- if (mutation == null)
- {
- mut = new Mutation(ksName, key);
- mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut;
- ksMap.put(key, mutation);
- }
- else
- {
- mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
- }
+ collector.validateIndexedColumns();
+ return collector.toMutations();
+ }
- statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
- }
+ private int updatedRows()
+ {
+ // Note: it's possible for 2 statements to actually apply to the same row, but that's just an estimation
+ // for sizing our PartitionUpdate backing array, so it's good enough.
+ return statements.size();
}
/**
@@@ -286,9 -271,9 +286,9 @@@
}
else if (logger.isWarnEnabled())
{
- logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
+ logger.warn(format, tableNames, size, warnThreshold, size - warnThreshold, "");
}
- ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {tableNames, size, warnThreshold, size - warnThreshold, ""}).getMessage());
- ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[]{ ksCfPairs, size, warnThreshold, size - warnThreshold, "" }).getMessage());
++ ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[] {tableNames, size, warnThreshold, size - warnThreshold, ""}).getMessage());
}
}
@@@ -311,17 -298,21 +311,16 @@@
}
// CASSANDRA-9303: If we only have local mutations we do not warn
- if (localMutationsOnly)
+ if (localPartitionsOnly)
return;
- NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning,
- keySet.size(), keySet.size() == 1 ? "" : "s",
- ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
- ClientWarn.instance.warn(MessageFormatter.arrayFormat(unloggedBatchWarning,
- new Object[]{
- keySet.size(),
- keySet.size() == 1 ? "" : "s",
- ksCfPairs.size() == 1 ? "" : "s",
- ksCfPairs
- }).getMessage());
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING,
+ keySet.size(), keySet.size() == 1 ? "" : "s",
+ tableNames.size() == 1 ? "" : "s", tableNames);
- ClientWarn.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
++ ClientWarn.instance.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
+ tableNames.size() == 1 ? "" : "s", tableNames}).getMessage());
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 4017ce6,0000000..5af4887
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@@ -1,330 -1,0 +1,330 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
+import org.apache.cassandra.cql3.selection.RawSelector;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.Event;
+
+public class CreateViewStatement extends SchemaAlteringStatement
+{
+ private final CFName baseName;
+ private final List<RawSelector> selectClause;
+ private final WhereClause whereClause;
+ private final List<ColumnIdentifier.Raw> partitionKeys;
+ private final List<ColumnIdentifier.Raw> clusteringKeys;
+ public final CFProperties properties = new CFProperties();
+ private final boolean ifNotExists;
+
+ public CreateViewStatement(CFName viewName,
+ CFName baseName,
+ List<RawSelector> selectClause,
+ WhereClause whereClause,
+ List<ColumnIdentifier.Raw> partitionKeys,
+ List<ColumnIdentifier.Raw> clusteringKeys,
+ boolean ifNotExists)
+ {
+ super(viewName);
+ this.baseName = baseName;
+ this.selectClause = selectClause;
+ this.whereClause = whereClause;
+ this.partitionKeys = partitionKeys;
+ this.clusteringKeys = clusteringKeys;
+ this.ifNotExists = ifNotExists;
+ }
+
+
+ public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+ {
+ if (!baseName.hasKeyspace())
+ baseName.setKeyspace(keyspace(), true);
+ state.hasColumnFamilyAccess(keyspace(), baseName.getColumnFamily(), Permission.ALTER);
+ }
+
+ public void validate(ClientState state) throws RequestValidationException
+ {
+ // We do validation in announceMigration to reduce doubling up of work
+ }
+
+ private interface AddColumn {
+ void add(ColumnIdentifier identifier, AbstractType<?> type);
+ }
+
+ private void add(CFMetaData baseCfm, Iterable<ColumnIdentifier> columns, AddColumn adder)
+ {
+ for (ColumnIdentifier column : columns)
+ {
+ AbstractType<?> type = baseCfm.getColumnDefinition(column).type;
+ if (properties.definedOrdering.containsKey(column))
+ {
+ boolean desc = properties.definedOrdering.get(column);
+ if (!desc && type.isReversed())
+ {
+ type = ((ReversedType)type).baseType;
+ }
+ else if (desc && !type.isReversed())
+ {
+ type = ReversedType.getInstance(type);
+ }
+ }
+ adder.add(column, type);
+ }
+ }
+
+ public Event.SchemaChange announceMigration(boolean isLocalOnly) throws RequestValidationException
+ {
+ // We need to make sure that:
+ // - primary key includes all columns in base table's primary key
+ // - make sure that the select statement does not have anything other than columns
+ // and their names match the base table's names
+ // - make sure that primary key does not include any collections
+ // - make sure there is no where clause in the select statement
+ // - make sure there is not currently a table or view
+ // - make sure baseTable gcGraceSeconds > 0
+
+ properties.validate();
+
+ if (properties.useCompactStorage)
+ throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view");
+
+ // We enforce the keyspace because if the RF is different, the logic to wait for a
+ // specific replica would break
+ if (!baseName.getKeyspace().equals(keyspace()))
+ throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace");
+
+ CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily());
+
+ if (cfm.isCounter())
+ throw new InvalidRequestException("Materialized views are not supported on counter tables");
+ if (cfm.isView())
+ throw new InvalidRequestException("Materialized views cannot be created against other materialized views");
+
+ if (cfm.params.gcGraceSeconds == 0)
+ {
+ throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " +
+ "'%s' with gc_grace_seconds of 0, since this value is " +
+ "used to TTL undelivered updates. Setting gc_grace_seconds" +
+ " too low might cause undelivered updates to expire " +
+ "before being replayed.", cfName.getColumnFamily(),
+ baseName.getColumnFamily()));
+ }
+
+ Set<ColumnIdentifier> included = new HashSet<>();
+ for (RawSelector selector : selectClause)
+ {
+ Selectable.Raw selectable = selector.selectable;
+ if (selectable instanceof Selectable.WithFieldSelection.Raw)
+ throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view");
+ if (selectable instanceof Selectable.WithFunction.Raw)
+ throw new InvalidRequestException("Cannot use function when defining a materialized view");
+ if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
+ throw new InvalidRequestException("Cannot use function when defining a materialized view");
+ ColumnIdentifier identifier = (ColumnIdentifier) selectable.prepare(cfm);
+ if (selector.alias != null)
+ throw new InvalidRequestException(String.format("Cannot alias column '%s' as '%s' when defining a materialized view", identifier.toString(), selector.alias.toString()));
+
+ ColumnDefinition cdef = cfm.getColumnDefinition(identifier);
+
+ if (cdef == null)
+ throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+ if (cdef.isStatic())
- ClientWarn.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier));
++ ClientWarn.instance.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier));
+ else
+ included.add(identifier);
+ }
+
+ Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>();
+ for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys))
+ {
+ if (!targetPrimaryKeys.add(identifier))
+ throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier);
+
+ ColumnDefinition cdef = cfm.getColumnDefinition(identifier.prepare(cfm));
+
+ if (cdef == null)
+ throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+ if (cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell())
+ throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));
+
+ if (cdef.isStatic())
+ throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier));
+ }
+
+ // build the select statement
+ Map<ColumnIdentifier.Raw, Boolean> orderings = Collections.emptyMap();
+ SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, false, true, false);
+ SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null);
+
+ ClientState state = ClientState.forInternalCalls();
+ state.setKeyspace(keyspace());
+
+ rawSelect.prepareKeyspace(state);
+ rawSelect.setBoundVariables(getBoundVariables());
+
+ ParsedStatement.Prepared prepared = rawSelect.prepare(true);
+ SelectStatement select = (SelectStatement) prepared.statement;
+ StatementRestrictions restrictions = select.getRestrictions();
+
+ if (!prepared.boundNames.isEmpty())
+ throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements");
+
+ if (!restrictions.nonPKRestrictedColumns(false).isEmpty())
+ {
+ throw new InvalidRequestException(String.format(
+ "Non-primary key columns cannot be restricted in the SELECT statement used for materialized view " +
+ "creation (got restrictions on: %s)",
+ restrictions.nonPKRestrictedColumns(false).stream().map(def -> def.name.toString()).collect(Collectors.joining(", "))));
+ }
+
+ String whereClauseText = View.relationsToWhereClause(whereClause.relations);
+
+ Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
+ for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns()))
+ basePrimaryKeyCols.add(definition.name);
+
+ List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>();
+ List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>();
+
+ // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used
+ boolean hasNonPKColumn = false;
+ for (ColumnIdentifier.Raw raw : partitionKeys)
+ hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, restrictions);
+
+ for (ColumnIdentifier.Raw raw : clusteringKeys)
+ hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, restrictions);
+
+ // We need to include all of the primary key columns from the base table in order to make sure that we do not
+ // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
+ // the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
+ // used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
+ // that they include all of the columns. We provide them with a list of all of the columns left to include.
+ boolean missingClusteringColumns = false;
+ StringBuilder columnNames = new StringBuilder();
+ List<ColumnIdentifier> includedColumns = new ArrayList<>();
+ for (ColumnDefinition def : cfm.allColumns())
+ {
+ ColumnIdentifier identifier = def.name;
+
+ if ((included.isEmpty() || included.contains(identifier))
+ && !targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier)
+ && !def.isStatic())
+ {
+ includedColumns.add(identifier);
+ }
+ if (!def.isPrimaryKeyColumn()) continue;
+
+ if (!targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier))
+ {
+ if (missingClusteringColumns)
+ columnNames.append(',');
+ else
+ missingClusteringColumns = true;
+ columnNames.append(identifier);
+ }
+ }
+ if (missingClusteringColumns)
+ throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)",
+ columnFamily(), baseName.getColumnFamily(), columnNames.toString()));
+
+ if (targetPartitionKeys.isEmpty())
+ throw new InvalidRequestException("Must select at least a column for a Materialized View");
+
+ if (targetClusteringColumns.isEmpty())
+ throw new InvalidRequestException("No columns are defined for Materialized View other than primary key");
+
+ CFMetaData.Builder cfmBuilder = CFMetaData.Builder.createView(keyspace(), columnFamily());
+ add(cfm, targetPartitionKeys, cfmBuilder::addPartitionKey);
+ add(cfm, targetClusteringColumns, cfmBuilder::addClusteringColumn);
+ add(cfm, includedColumns, cfmBuilder::addRegularColumn);
+ cfmBuilder.withId(properties.properties.getId());
+ TableParams params = properties.properties.asNewTableParams();
+ CFMetaData viewCfm = cfmBuilder.build().params(params);
+ ViewDefinition definition = new ViewDefinition(keyspace(),
+ columnFamily(),
+ Schema.instance.getId(keyspace(), baseName.getColumnFamily()),
+ baseName.getColumnFamily(),
+ included.isEmpty(),
+ rawSelect,
+ whereClauseText,
+ viewCfm);
+
+ try
+ {
+ MigrationManager.announceNewView(definition, isLocalOnly);
+ return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+ }
+ catch (AlreadyExistsException e)
+ {
+ if (ifNotExists)
+ return null;
+ throw e;
+ }
+ }
+
+ private static boolean getColumnIdentifier(CFMetaData cfm,
+ Set<ColumnIdentifier> basePK,
+ boolean hasNonPKColumn,
+ ColumnIdentifier.Raw raw,
+ List<ColumnIdentifier> columns,
+ StatementRestrictions restrictions)
+ {
+ ColumnIdentifier identifier = raw.prepare(cfm);
+ ColumnDefinition def = cfm.getColumnDefinition(identifier);
+
+ boolean isPk = basePK.contains(identifier);
+ if (!isPk && hasNonPKColumn)
+ throw new InvalidRequestException(String.format("Cannot include more than one non-primary key column '%s' in materialized view partition key", identifier));
+
+ // We don't need to include the "IS NOT NULL" filter on a non-composite partition key
+ // because we will never allow a single partition key to be NULL
+ boolean isSinglePartitionKey = cfm.getColumnDefinition(identifier).isPartitionKey()
+ && cfm.partitionKeyColumns().size() == 1;
+ if (!isSinglePartitionKey && !restrictions.isRestricted(def))
+ throw new InvalidRequestException(String.format("Primary key column '%s' is required to be filtered by 'IS NOT NULL'", identifier));
+
+ columns.add(identifier);
+ return !isPk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a9bb121,5cfa94b..904adca
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -359,21 -256,19 +359,21 @@@ public class SelectStatement implement
else if (restrictions.keyIsInRelation())
{
logger.warn("Aggregation query used on multiple partition keys (IN restriction)");
- ClientWarn.warn("Aggregation query used on multiple partition keys (IN restriction)");
+ ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)");
}
- Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);
+ Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
while (!pager.isExhausted())
{
- for (Row row : pager.fetchPage(pageSize))
+ try (PartitionIterator iter = pager.fetchPage(pageSize))
{
- // Not columns match the query, skip
- if (row.cf == null)
- continue;
-
- processColumnFamily(row.key.getKey(), row.cf, options, now, result);
+ while (iter.hasNext())
+ {
+ try (RowIterator partition = iter.next())
+ {
+ processPartition(partition, options, result, nowInSec);
+ }
+ }
}
}
return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 3f0695c,cd86336..668a189
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -232,713 -95,55 +232,713 @@@ public abstract class ReadCommand imple
return this;
}
- public String getColumnFamilyName()
+ /**
+ * Sets the digest version, for when digest for that command is requested.
+ * <p>
+ * Note that we allow setting this independently of setting the command as a digest query as
+ * this allows us to use the command as a carrier of the digest version even if we only call
+ * setIsDigestQuery on some copy of it.
+ *
+ * @param digestVersion the version for the digest is this command is used for digest query..
+ * @return this read command.
+ */
+ public ReadCommand setDigestVersion(int digestVersion)
{
- return cfName;
+ this.digestVersion = digestVersion;
+ return this;
}
+ /**
+ * Whether this query is for thrift or not.
+ *
+ * @return whether this query is for thrift.
+ */
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ /**
+ * The clustering index filter this command to use for the provided key.
+ * <p>
+ * Note that that method should only be called on a key actually queried by this command
+ * and in practice, this will almost always return the same filter, but for the sake of
+ * paging, the filter on the first key of a range command might be slightly different.
+ *
+ * @param key a partition key queried by this command.
+ *
+ * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
+ */
+ public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
+
+ /**
+ * Returns a copy of this command.
+ *
+ * @return a copy of this command.
+ */
public abstract ReadCommand copy();
- public abstract Row getRow(Keyspace keyspace);
+ protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
+
+ protected abstract int oldestUnrepairedTombstone();
+
+ public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
+ {
+ return isDigestQuery()
+ ? ReadResponse.createDigestResponse(iterator, digestVersion)
+ : ReadResponse.createDataResponse(iterator, selection);
+ }
+
+ public long indexSerializedSize(int version)
+ {
+ if (index.isPresent())
+ return IndexMetadata.serializer.serializedSize(index.get(), version);
+ else
+ return 0;
+ }
+
+ public Index getIndex(ColumnFamilyStore cfs)
+ {
+ // if we've already consulted the index manager, and it returned a valid index
+ // the result should be cached here.
+ if(index.isPresent())
+ return cfs.indexManager.getIndex(index.get());
+
+ // if no cached index is present, but we've already consulted the index manager
+ // then no registered index is suitable for this command, so just return null.
+ if (indexManagerQueried)
+ return null;
+
+ // do the lookup, set the flag to indicate so and cache the result if not null
+ Index selected = cfs.indexManager.getBestIndexFor(this);
+ indexManagerQueried = true;
- public abstract IDiskAtomFilter filter();
+ if (selected == null)
+ return null;
- public String getKeyspace()
+ index = Optional.of(selected.getIndexMetadata());
+ return selected;
+ }
+
+ /**
+ * Executes this command on the local host.
+ *
+ * @param orderGroup the operation group spanning this command
+ *
+ * @return an iterator over the result of executing this command locally.
+ */
+ @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
+ // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
+ public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
{
- return ksName;
+ long startTimeNanos = System.nanoTime();
+
+ ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
+ Index index = getIndex(cfs);
+
+ Index.Searcher searcher = null;
+ if (index != null)
+ {
+ if (!cfs.indexManager.isIndexQueryable(index))
+ throw new IndexNotAvailableException(index);
+
+ searcher = index.searcherFor(this);
+ Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
+ }
+
+ UnfilteredPartitionIterator resultIterator = searcher == null
+ ? queryStorage(cfs, orderGroup)
+ : searcher.search(orderGroup);
+
+ try
+ {
+ resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
+
+ // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
+ // no point in checking it again.
+ RowFilter updatedFilter = searcher == null
+ ? rowFilter()
+ : index.getPostIndexQueryFilter(rowFilter());
+
+ // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
+ // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
+ // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
+ // processing we do on it).
+ return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
+ }
+ catch (RuntimeException | Error e)
+ {
+ resultIterator.close();
+ throw e;
+ }
}
- // maybeGenerateRetryCommand is used to generate a retry for short reads
- public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
+ protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
+
+ public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
{
- return null;
+ return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
}
- // maybeTrim removes columns from a response that is too long
- public Row maybeTrim(Row row)
+ public ReadOrderGroup startOrderGroup()
{
- return row;
+ return ReadOrderGroup.forCommand(this);
}
- public long getTimeout()
+ /**
+ * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
+ * This also log warning/trow TombstoneOverwhelmingException if appropriate.
+ */
+ private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
{
- return DatabaseDescriptor.getReadRpcTimeout();
+ class MetricRecording extends Transformation<UnfilteredRowIterator>
+ {
+ private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
+ private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
+
+ private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
+
+ private int liveRows = 0;
+ private int tombstones = 0;
+
+ private DecoratedKey currentKey;
+
+ @Override
+ public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
+ {
+ currentKey = iter.partitionKey();
+ return Transformation.apply(iter, this);
+ }
+
+ @Override
+ public Row applyToStatic(Row row)
+ {
+ return applyToRow(row);
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ if (row.hasLiveData(ReadCommand.this.nowInSec()))
+ ++liveRows;
+
+ for (Cell cell : row.cells())
+ {
+ if (!cell.isLive(ReadCommand.this.nowInSec()))
+ countTombstone(row.clustering());
+ }
+ return row;
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ countTombstone(marker.clustering());
+ return marker;
+ }
+
+ private void countTombstone(ClusteringPrefix clustering)
+ {
+ ++tombstones;
+ if (tombstones > failureThreshold && respectTombstoneThresholds)
+ {
+ String query = ReadCommand.this.toCQLString();
+ Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
+ throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
+ }
+ }
+
+ @Override
+ public void onClose()
+ {
+ recordLatency(metric, System.nanoTime() - startTimeNanos);
+
+ metric.tombstoneScannedHistogram.update(tombstones);
+ metric.liveScannedHistogram.update(liveRows);
+
+ boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
+ if (warnTombstones)
+ {
+ String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
- ClientWarn.warn(msg);
++ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ }
+
+ Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
+ }
+ };
+
+ return Transformation.apply(iter, new MetricRecording());
}
-}
-class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
-{
- public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ /**
+ * Creates a message for this command.
+ */
+ public abstract MessageOut<ReadCommand> createMessage(int version);
+
+ protected abstract void appendCQLWhereClause(StringBuilder sb);
+
+ // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
+ // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
+ // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
+ protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
+ {
+ final boolean isForThrift = iterator.isForThrift();
+ class WithoutPurgeableTombstones extends PurgeFunction
+ {
+ public WithoutPurgeableTombstones()
+ {
+ super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
+ }
+
+ protected long getMaxPurgeableTimestamp()
+ {
+ return Long.MAX_VALUE;
+ }
+ }
+ return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+ }
+
+ /**
+ * Recreate the CQL string corresponding to this query.
+ * <p>
+ * Note that in general the returned string will not be exactly the original user string, first
+ * because there isn't always a single syntax for a given query, but also because we don't have
+ * all the information needed (we know the non-PK columns queried but not the PK ones as internally
+ * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
+ * debugging purpose which is what this is for.
+ */
+ public String toCQLString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ").append(columnFilter());
+ sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
+ appendCQLWhereClause(sb);
+
+ if (limits() != DataLimits.NONE)
+ sb.append(' ').append(limits());
+ return sb.toString();
+ }
+
+ private static class Serializer implements IVersionedSerializer<ReadCommand>
+ {
+ private static int digestFlag(boolean isDigest)
+ {
+ return isDigest ? 0x01 : 0;
+ }
+
+ private static boolean isDigest(int flags)
+ {
+ return (flags & 0x01) != 0;
+ }
+
+ private static int thriftFlag(boolean isForThrift)
+ {
+ return isForThrift ? 0x02 : 0;
+ }
+
+ private static boolean isForThrift(int flags)
+ {
+ return (flags & 0x02) != 0;
+ }
+
+ private static int indexFlag(boolean hasIndex)
+ {
+ return hasIndex ? 0x04 : 0;
+ }
+
+ private static boolean hasIndex(int flags)
+ {
+ return (flags & 0x04) != 0;
+ }
+
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
+ assert version >= MessagingService.VERSION_30;
+
+ out.writeByte(command.kind.ordinal());
+ out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
+ if (command.isDigestQuery())
+ out.writeUnsignedVInt(command.digestVersion());
+ CFMetaData.serializer.serialize(command.metadata(), out, version);
+ out.writeInt(command.nowInSec());
+ ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
+ RowFilter.serializer.serialize(command.rowFilter(), out, version);
+ DataLimits.serializer.serialize(command.limits(), out, version);
+ if (command.index.isPresent())
+ IndexMetadata.serializer.serialize(command.index.get(), out, version);
+
+ command.serializeSelection(out, version);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ return legacyReadCommandSerializer.deserialize(in, version);
+
+ Kind kind = Kind.values()[in.readByte()];
+ int flags = in.readByte();
+ boolean isDigest = isDigest(flags);
+ boolean isForThrift = isForThrift(flags);
+ boolean hasIndex = hasIndex(flags);
+ int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ int nowInSec = in.readInt();
+ ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
+ RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
+ DataLimits limits = DataLimits.serializer.deserialize(in, version);
+ Optional<IndexMetadata> index = hasIndex
+ ? deserializeIndexMetadata(in, version, metadata)
+ : Optional.empty();
+
+ return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ }
+
+ private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+ {
+ try
+ {
+ return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+ }
+ catch (UnknownIndexException e)
+ {
+ String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
+ "If an index was just created, this is likely due to the schema not " +
+ "being fully propagated. Local read will proceed without using the " +
+ "index. Please wait for schema agreement after index creation.",
+ cfm.ksName, cfm.cfName, e.indexId.toString());
+ logger.info(message);
+ return Optional.empty();
+ }
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
+ assert version >= MessagingService.VERSION_30;
+
+ return 2 // kind + flags
+ + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
+ + CFMetaData.serializer.serializedSize(command.metadata(), version)
+ + TypeSizes.sizeof(command.nowInSec())
+ + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+ + RowFilter.serializer.serializedSize(command.rowFilter(), version)
+ + DataLimits.serializer.serializedSize(command.limits(), version)
+ + command.selectionSerializedSize(version)
+ + command.indexSerializedSize(version);
+ }
+ }
+
+ // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0
+ // compatibility
+ private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
+ {
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ if (version < MessagingService.VERSION_30)
+ legacyRangeSliceCommandSerializer.serialize(command, out, version);
+ else
+ serializer.serialize(command, out, version);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyRangeSliceCommandSerializer.deserialize(in, version)
+ : serializer.deserialize(in, version);
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
+ : serializer.serializedSize(command, version);
+ }
+ }
+
+ private enum LegacyType
+ {
+ GET_BY_NAMES((byte)1),
+ GET_SLICES((byte)2);
+
+ public final byte serializedValue;
+
+ LegacyType(byte b)
+ {
+ this.serializedValue = b;
+ }
+
+ public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
+ {
+ return kind == ClusteringIndexFilter.Kind.SLICE
+ ? GET_SLICES
+ : GET_BY_NAMES;
+ }
+
+ public static LegacyType fromSerializedValue(byte b)
+ {
+ return b == 1 ? GET_BY_NAMES : GET_SLICES;
+ }
+ }
+
+ /**
+ * Serializer for pre-3.0 RangeSliceCommands.
+ */
+ private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
{
- out.writeByte(command.commandType.serializedValue);
- switch (command.commandType)
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ assert !rangeCommand.dataRange().isPaging();
+
+ // convert pre-3.0 incompatible names filters to slice filters
+ rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+
+ CFMetaData metadata = rangeCommand.metadata();
+
+ out.writeUTF(metadata.ksName);
+ out.writeUTF(metadata.cfName);
+ out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis
+
+ // begin DiskAtomFilterSerializer.serialize()
+ if (rangeCommand.isNamesQuery())
+ {
+ out.writeByte(1); // 0 for slices, 1 for names
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
+ }
+ else
+ {
+ out.writeByte(0); // 0 for slices, 1 for names
+
+ // slice filter serialization
+ ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
+
+ out.writeBoolean(filter.isReversed());
+
+ // limit
+ DataLimits.Kind kind = rangeCommand.limits().kind();
+ boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
+ if (isDistinct)
+ out.writeInt(1);
+ else
+ out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
+
+ int compositesToGroup;
+ boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ if (kind == DataLimits.Kind.THRIFT_LIMIT)
+ compositesToGroup = -1;
+ else if (isDistinct && !selectsStatics)
+ compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
+ else
+ compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
+
+ out.writeInt(compositesToGroup);
+ }
+
+ serializeRowFilter(out, rangeCommand.rowFilter());
+ AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
+
+ // maxResults
+ out.writeInt(rangeCommand.limits().count());
+
+ // countCQL3Rows
+ if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT
+ out.writeBoolean(false);
+ else
+ out.writeBoolean(true);
+
+ // isPaging
+ out.writeBoolean(false);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ String keyspace = in.readUTF();
+ String columnFamily = in.readUTF();
+
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+ if (metadata == null)
+ {
+ String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
+ throw new UnknownColumnFamilyException(message, null);
+ }
+
+ int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds
+
+ ClusteringIndexFilter filter;
+ ColumnFilter selection;
+ int compositesToGroup = 0;
+ int perPartitionLimit = -1;
+ byte readType = in.readByte(); // 0 for slices, 1 for names
+ if (readType == 1)
+ {
+ Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
+ selection = selectionAndFilter.left;
+ filter = selectionAndFilter.right;
+ }
+ else
+ {
+ Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
+ filter = p.left;
+ perPartitionLimit = in.readInt();
+ compositesToGroup = in.readInt();
+ selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
+ }
+
+ RowFilter rowFilter = deserializeRowFilter(in, metadata);
+
+ AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
+ int maxResults = in.readInt();
+
+ in.readBoolean(); // countCQL3Rows (not needed)
+ in.readBoolean(); // isPaging (not needed)
+
+ boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
+ boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+ DataLimits limits;
+ if (isDistinct)
+ limits = DataLimits.distinctLimits(maxResults);
+ else if (compositesToGroup == -1)
+ limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
+ else
+ limits = DataLimits.cqlLimits(maxResults);
+
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
+ }
+
+ static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
+ {
+ ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
+ out.writeInt(indexExpressions.size());
+ for (RowFilter.Expression expression : indexExpressions)
+ {
+ ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
+ expression.operator().writeTo(out);
+ ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
+ }
+ }
+
+ static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+ {
+ int numRowFilters = in.readInt();
+ if (numRowFilters == 0)
+ return RowFilter.NONE;
+
+ RowFilter rowFilter = RowFilter.create(numRowFilters);
+ for (int i = 0; i < numRowFilters; i++)
+ {
+ ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
+ ColumnDefinition column = metadata.getColumnDefinition(columnName);
+ Operator op = Operator.readFrom(in);
+ ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
+ rowFilter.add(column, op, indexValue);
+ }
+ return rowFilter;
+ }
+
+ static long serializedRowFilterSize(RowFilter rowFilter)
+ {
+ long size = TypeSizes.sizeof(0); // rowFilterCount
+ for (RowFilter.Expression expression : rowFilter)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+ size += TypeSizes.sizeof(0); // operator int value
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+ }
+ return size;
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ assert version < MessagingService.VERSION_30;
+ assert command.kind == Kind.PARTITION_RANGE;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+ CFMetaData metadata = rangeCommand.metadata();
+
+ long size = TypeSizes.sizeof(metadata.ksName);
+ size += TypeSizes.sizeof(metadata.cfName);
+ size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
+
+ size += 1; // single byte flag: 0 for slices, 1 for names
+ if (rangeCommand.isNamesQuery())
+ {
+ PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
+ }
+ else
+ {
+ ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
+ size += TypeSizes.sizeof(filter.isReversed());
+ size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
+ size += TypeSizes.sizeof(0); // compositesToGroup
+ }
+
+ if (rangeCommand.rowFilter().equals(RowFilter.NONE))
+ {
+ size += TypeSizes.sizeof(0);
+ }
+ else
+ {
+ ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
+ size += TypeSizes.sizeof(indexExpressions.size());
+ for (RowFilter.Expression expression : indexExpressions)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+ size += TypeSizes.sizeof(expression.operator().ordinal());
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+ }
+ }
+
+ size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
+ size += TypeSizes.sizeof(rangeCommand.limits().count());
+ size += TypeSizes.sizeof(!rangeCommand.isForThrift());
+ return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
+ }
+
+ static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
{
- case GET_BY_NAMES:
- SliceByNamesReadCommand.serializer.serialize(command, out, version);
- break;
- case GET_SLICES:
- SliceFromReadCommand.serializer.serialize(command, out, version);
- break;
- default:
- throw new AssertionError();
+ if (!command.dataRange().isNamesQuery())
+ return command;
+
+ CFMetaData metadata = command.metadata();
+ if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
+ return command;
+
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
+ ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
+ DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
+ return new PartitionRangeReadCommand(
+ command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
+ command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
+ }
+
+ static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
+ {
+ // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
+ // In that case, we'll basically be querying the first row of the partition, but we must make sure we include
+ // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise.
+ if (compositesToGroup == -2)
+ return ColumnFilter.all(metadata);
+
+ // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
+ PartitionColumns columns = selectsStatics
+ ? metadata.partitionColumns()
+ : metadata.partitionColumns().withoutStatics();
+ return ColumnFilter.selectionBuilder().addAll(columns).build();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index ccc900b,459923b..d416dca
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -39,8 -39,12 +39,9 @@@ import com.google.common.collect.Lists
import com.google.common.collect.Sets;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ExecutorLocal;
+ import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;