You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/01/13 20:48:59 UTC

[08/13] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94e7ef17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94e7ef17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94e7ef17

Branch: refs/heads/cassandra-3.0
Commit: 94e7ef17757235cb35df70ff9a2a63e1d29d6c41
Parents: 0f995a2 dbf6e62
Author: Carl Yeksigian <ca...@apache.org>
Authored: Wed Jan 13 14:45:13 2016 -0500
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Wed Jan 13 14:45:13 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../AbstractLocalAwareExecutorService.java      | 230 +++++++++++++++++++
 .../AbstractTracingAwareExecutorService.java    | 230 -------------------
 .../DebuggableThreadPoolExecutor.java           |  48 ++--
 .../cassandra/concurrent/ExecutorLocal.java     |  44 ++++
 .../cassandra/concurrent/ExecutorLocals.java    |  84 +++++++
 .../concurrent/LocalAwareExecutorService.java   |  34 +++
 .../cassandra/concurrent/SEPExecutor.java       |   3 +-
 .../concurrent/SharedExecutorPool.java          |   2 +-
 .../cassandra/concurrent/StageManager.java      |  12 +-
 .../concurrent/TracingAwareExecutorService.java |  36 ---
 .../cassandra/cql3/functions/UDFunction.java    |   2 +-
 .../cql3/statements/BatchStatement.java         |   9 +-
 .../cql3/statements/CreateViewStatement.java    |   2 +-
 .../cql3/statements/SelectStatement.java        |   4 +-
 .../org/apache/cassandra/db/ReadCommand.java    |   2 +-
 .../apache/cassandra/net/MessagingService.java  |   7 +-
 .../apache/cassandra/service/ClientWarn.java    |  62 +++--
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   3 +-
 .../org/apache/cassandra/transport/Message.java |   6 +-
 .../transport/RequestThreadPoolExecutor.java    |   4 +-
 .../cql3/validation/entities/UFTest.java        |   6 +-
 .../cassandra/service/ClientWarningsTest.java   |  58 +++++
 24 files changed, 545 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 614d5b4,6530956..a37ec99
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -2.2.5
 +3.0.3
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 +Merged from 2.2:
+  * Make sure client gets tombstone overwhelmed warning (CASSANDRA-9465)
   * Fix error streaming section more than 2GB (CASSANDRA-10961)
   * (cqlsh) Also apply --connect-timeout to control connection
     timeout (CASSANDRA-10959)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
index 0000000,088b43e..f47d8ac
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
+++ b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java
@@@ -1,0 -1,229 +1,230 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.concurrent;
+ 
+ import java.util.Collection;
+ import java.util.List;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.TimeoutException;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.tracing.TraceState;
+ import org.apache.cassandra.tracing.Tracing;
+ import org.apache.cassandra.utils.concurrent.SimpleCondition;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ 
+ import static org.apache.cassandra.tracing.Tracing.isTracing;
+ 
+ public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService
+ {
+     private static final Logger logger = LoggerFactory.getLogger(AbstractLocalAwareExecutorService.class);
+ 
+     protected abstract void addTask(FutureTask<?> futureTask);
+     protected abstract void onCompletion();
+ 
+     /** Task Submission / Creation / Objects **/
+ 
+     public <T> FutureTask<T> submit(Callable<T> task)
+     {
+         return submit(newTaskFor(task));
+     }
+ 
+     public FutureTask<?> submit(Runnable task)
+     {
+         return submit(newTaskFor(task, null));
+     }
+ 
+     public <T> FutureTask<T> submit(Runnable task, T result)
+     {
+         return submit(newTaskFor(task, result));
+     }
+ 
+     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+     {
+         throw new UnsupportedOperationException();
+     }
+ 
+     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+     {
+         throw new UnsupportedOperationException();
+     }
+ 
+     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+     {
+         throw new UnsupportedOperationException();
+     }
+ 
+     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+     {
+         throw new UnsupportedOperationException();
+     }
+ 
+     protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result)
+     {
+         return newTaskFor(runnable, result, ExecutorLocals.create());
+     }
+ 
+     protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals)
+     {
+         if (locals != null)
+         {
+             if (runnable instanceof LocalSessionFutureTask)
+                 return (LocalSessionFutureTask<T>) runnable;
+             return new LocalSessionFutureTask<T>(runnable, result, locals);
+         }
+         if (runnable instanceof FutureTask)
+             return (FutureTask<T>) runnable;
+         return new FutureTask<>(runnable, result);
+     }
+ 
+     protected <T> FutureTask<T> newTaskFor(Callable<T> callable)
+     {
+         if (isTracing())
+         {
+             if (callable instanceof LocalSessionFutureTask)
+                 return (LocalSessionFutureTask<T>) callable;
+             return new LocalSessionFutureTask<T>(callable, ExecutorLocals.create());
+         }
+         if (callable instanceof FutureTask)
+             return (FutureTask<T>) callable;
+         return new FutureTask<>(callable);
+     }
+ 
+     private class LocalSessionFutureTask<T> extends FutureTask<T>
+     {
+         private final ExecutorLocals locals;
+ 
+         public LocalSessionFutureTask(Callable<T> callable, ExecutorLocals locals)
+         {
+             super(callable);
+             this.locals = locals;
+         }
+ 
+         public LocalSessionFutureTask(Runnable runnable, T result, ExecutorLocals locals)
+         {
+             super(runnable, result);
+             this.locals = locals;
+         }
+ 
+         public void run()
+         {
+             ExecutorLocals old = ExecutorLocals.create();
+             ExecutorLocals.set(locals);
+             try
+             {
+                 super.run();
+             }
+             finally
+             {
+                 ExecutorLocals.set(old);
+             }
+         }
+     }
+ 
+     class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable
+     {
+         private boolean failure;
+         private Object result = this;
+         private final Callable<T> callable;
+ 
+         public FutureTask(Callable<T> callable)
+         {
+             this.callable = callable;
+         }
+         public FutureTask(Runnable runnable, T result)
+         {
+             this(Executors.callable(runnable, result));
+         }
+ 
+         public void run()
+         {
+             try
+             {
+                 result = callable.call();
+             }
+             catch (Throwable t)
+             {
+                 JVMStabilityInspector.inspectThrowable(t);
+                 logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t);
+                 result = t;
+                 failure = true;
+             }
+             finally
+             {
+                 signalAll();
+                 onCompletion();
+             }
+         }
+ 
+         public boolean cancel(boolean mayInterruptIfRunning)
+         {
+             return false;
+         }
+ 
+         public boolean isCancelled()
+         {
+             return false;
+         }
+ 
+         public boolean isDone()
+         {
+             return isSignaled();
+         }
+ 
+         public T get() throws InterruptedException, ExecutionException
+         {
+             await();
+             Object result = this.result;
+             if (failure)
+                 throw new ExecutionException((Throwable) result);
+             return (T) result;
+         }
+ 
+         public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+         {
 -            await(timeout, unit);
++            if (!await(timeout, unit))
++                throw new TimeoutException();
+             Object result = this.result;
+             if (failure)
+                 throw new ExecutionException((Throwable) result);
+             return (T) result;
+         }
+     }
+ 
+     private <T> FutureTask<T> submit(FutureTask<T> task)
+     {
+         addTask(task);
+         return task;
+     }
+ 
+     public void execute(Runnable command)
+     {
+         addTask(newTaskFor(command, ExecutorLocals.create()));
+     }
+ 
+     public void execute(Runnable command, ExecutorLocals locals)
+     {
+         addTask(newTaskFor(command, null, locals));
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 04a4c3d,1e5cea6..fa0d306
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@@ -269,164 -143,11 +269,164 @@@ public abstract class UDFunction extend
              return null;
  
          long tStart = System.nanoTime();
 -        ByteBuffer result = executeUserDefined(protocolVersion, parameters);
 -        Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
 -        return result;
 +        parameters = makeEmptyParametersNull(parameters);
 +
 +        try
 +        {
 +            // Using async UDF execution is expensive (adds about 100us overhead per invocation on a Core-i7 MBPr).
 +            ByteBuffer result = DatabaseDescriptor.enableUserDefinedFunctionsThreads()
 +                                ? executeAsync(protocolVersion, parameters)
 +                                : executeUserDefined(protocolVersion, parameters);
 +
 +            Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
 +            return result;
 +        }
 +        catch (InvalidRequestException e)
 +        {
 +            throw e;
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.trace("Invocation of user-defined function '{}' failed", this, t);
 +            if (t instanceof VirtualMachineError)
 +                throw (VirtualMachineError) t;
 +            throw FunctionExecutionException.create(this, t);
 +        }
      }
  
 +    public static void assertUdfsEnabled(String language)
 +    {
 +        if (!DatabaseDescriptor.enableUserDefinedFunctions())
 +            throw new InvalidRequestException("User-defined functions are disabled in cassandra.yaml - set enable_user_defined_functions=true to enable");
 +        if (!"java".equalsIgnoreCase(language) && !DatabaseDescriptor.enableScriptedUserDefinedFunctions())
 +            throw new InvalidRequestException("Scripted user-defined functions are disabled in cassandra.yaml - set enable_scripted_user_defined_functions=true to enable if you are aware of the security risks");
 +    }
 +
 +    static void initializeThread()
 +    {
 +        // Get the TypeCodec stuff in Java Driver initialized.
 +        // This is to get the classes loaded outside of the restricted sandbox's security context of a UDF.
 +        UDHelper.codecFor(DataType.inet()).format(InetAddress.getLoopbackAddress());
 +        UDHelper.codecFor(DataType.ascii()).format("");
 +    }
 +
 +    private static final class ThreadIdAndCpuTime extends CompletableFuture<Object>
 +    {
 +        long threadId;
 +        long cpuTime;
 +
 +        ThreadIdAndCpuTime()
 +        {
 +            // Looks weird?
 +            // This call "just" links this class to java.lang.management - otherwise UDFs (script UDFs) might fail due to
 +            //      java.security.AccessControlException: access denied: ("java.lang.RuntimePermission" "accessClassInPackage.java.lang.management")
 +            // because class loading would be deferred until setup() is executed - but setup() is called with
 +            // limited privileges.
 +            threadMXBean.getCurrentThreadCpuTime();
 +        }
 +
 +        void setup()
 +        {
 +            this.threadId = Thread.currentThread().getId();
 +            this.cpuTime = threadMXBean.getCurrentThreadCpuTime();
 +            complete(null);
 +        }
 +    }
 +
 +    private ByteBuffer executeAsync(int protocolVersion, List<ByteBuffer> parameters)
 +    {
 +        ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
 +
 +        Future<ByteBuffer> future = executor().submit(() -> {
 +            threadIdAndCpuTime.setup();
 +            return executeUserDefined(protocolVersion, parameters);
 +        });
 +
 +        try
 +        {
 +            if (DatabaseDescriptor.getUserDefinedFunctionWarnTimeout() > 0)
 +                try
 +                {
 +                    return future.get(DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
 +                }
 +                catch (TimeoutException e)
 +                {
 +
 +                    // log and emit a warning that UDF execution took long
 +                    String warn = String.format("User defined function %s ran longer than %dms", this, DatabaseDescriptor.getUserDefinedFunctionWarnTimeout());
 +                    logger.warn(warn);
-                     ClientWarn.warn(warn);
++                    ClientWarn.instance.warn(warn);
 +                }
 +
 +            // retry with difference of warn-timeout to fail-timeout
 +            return future.get(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            Thread.currentThread().interrupt();
 +            throw new RuntimeException(e);
 +        }
 +        catch (ExecutionException e)
 +        {
 +            Throwable c = e.getCause();
 +            if (c instanceof RuntimeException)
 +                throw (RuntimeException) c;
 +            throw new RuntimeException(c);
 +        }
 +        catch (TimeoutException e)
 +        {
 +            // retry a last time with the difference of UDF-fail-timeout to consumed CPU time (just in case execution hit a badly timed GC)
 +            try
 +            {
 +                //The threadIdAndCpuTime shouldn't take a long time to be set so this should return immediately
 +                threadIdAndCpuTime.get(1, TimeUnit.SECONDS);
 +
 +                long cpuTimeMillis = threadMXBean.getThreadCpuTime(threadIdAndCpuTime.threadId) - threadIdAndCpuTime.cpuTime;
 +                cpuTimeMillis /= 1000000L;
 +
 +                return future.get(Math.max(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - cpuTimeMillis, 0L),
 +                                  TimeUnit.MILLISECONDS);
 +            }
 +            catch (InterruptedException e1)
 +            {
 +                Thread.currentThread().interrupt();
 +                throw new RuntimeException(e);
 +            }
 +            catch (ExecutionException e1)
 +            {
 +                Throwable c = e.getCause();
 +                if (c instanceof RuntimeException)
 +                    throw (RuntimeException) c;
 +                throw new RuntimeException(c);
 +            }
 +            catch (TimeoutException e1)
 +            {
 +                TimeoutException cause = new TimeoutException(String.format("User defined function %s ran longer than %dms%s",
 +                                                                            this,
 +                                                                            DatabaseDescriptor.getUserDefinedFunctionFailTimeout(),
 +                                                                            DatabaseDescriptor.getUserFunctionTimeoutPolicy() == Config.UserFunctionTimeoutPolicy.ignore
 +                                                                            ? "" : " - will stop Cassandra VM"));
 +                FunctionExecutionException fe = FunctionExecutionException.create(this, cause);
 +                JVMStabilityInspector.userFunctionTimeout(cause);
 +                throw fe;
 +            }
 +        }
 +    }
 +
 +    private List<ByteBuffer> makeEmptyParametersNull(List<ByteBuffer> parameters)
 +    {
 +        List<ByteBuffer> r = new ArrayList<>(parameters.size());
 +        for (int i = 0; i < parameters.size(); i++)
 +        {
 +            ByteBuffer param = parameters.get(i);
 +            r.add(UDHelper.isNullOrEmpty(argTypes.get(i), param)
 +                  ? null : param);
 +        }
 +        return r;
 +    }
 +
 +    protected abstract ExecutorService executor();
 +
      public boolean isCallableWrtNullable(List<ByteBuffer> parameters)
      {
          if (!calledOnNullInput)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 3979597,a289ad1..47396fb
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -226,35 -179,67 +226,35 @@@ public class BatchStatement implements 
          for (int i = 0; i < statements.size(); i++)
          {
              ModificationStatement statement = statements.get(i);
 +            if (isLogged() && statement.cfm.params.gcGraceSeconds == 0)
 +            {
 +                if (tablesWithZeroGcGs == null)
 +                    tablesWithZeroGcGs = new HashSet<>();
 +                tablesWithZeroGcGs.add(String.format("%s.%s", statement.cfm.ksName, statement.cfm.cfName));
 +            }
              QueryOptions statementOptions = options.forStatement(i);
              long timestamp = attrs.getTimestamp(now, statementOptions);
 -            addStatementMutations(statement, statementOptions, local, timestamp, mutations);
 +            statement.addUpdates(collector, statementOptions, local, timestamp);
          }
 -        return unzipMutations(mutations);
 -    }
 -
 -    private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations)
 -    {
 -
 -        // The case where all statement where on the same keyspace is pretty common
 -        if (mutations.size() == 1)
 -            return mutations.values().iterator().next().values();
 -
  
 -        List<IMutation> ms = new ArrayList<>();
 -        for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
 -            ms.addAll(ksMap.values());
 -
 -        return ms;
 -    }
 -
 -    private void addStatementMutations(ModificationStatement statement,
 -                                       QueryOptions options,
 -                                       boolean local,
 -                                       long now,
 -                                       Map<String, Map<ByteBuffer, IMutation>> mutations)
 -    throws RequestExecutionException, RequestValidationException
 -    {
 -        String ksName = statement.keyspace();
 -        Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
 -        if (ksMap == null)
 +        if (tablesWithZeroGcGs != null)
          {
 -            ksMap = new HashMap<>();
 -            mutations.put(ksName, ksMap);
 +            String suffix = tablesWithZeroGcGs.size() == 1 ? "" : "s";
 +            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, LOGGED_BATCH_LOW_GCGS_WARNING,
 +                             suffix, tablesWithZeroGcGs);
-             ClientWarn.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
-                                             .getMessage());
++            ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
++                                                     .getMessage());
          }
  
 -        // The following does the same than statement.getMutations(), but we inline it here because
 -        // we don't want to recreate mutations every time as this is particularly inefficient when applying
 -        // multiple batch to the same partition (see #6737).
 -        List<ByteBuffer> keys = statement.buildPartitionKeyNames(options);
 -        Composite clusteringPrefix = statement.createClusteringPrefix(options);
 -        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now);
 -
 -        for (ByteBuffer key : keys)
 -        {
 -            IMutation mutation = ksMap.get(key);
 -            Mutation mut;
 -            if (mutation == null)
 -            {
 -                mut = new Mutation(ksName, key);
 -                mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut;
 -                ksMap.put(key, mutation);
 -            }
 -            else
 -            {
 -                mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
 -            }
 +        collector.validateIndexedColumns();
 +        return collector.toMutations();
 +    }
  
 -            statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
 -        }
 +    private int updatedRows()
 +    {
 +        // Note: it's possible for 2 statements to actually apply to the same row, but that's just an estimation
 +        // for sizing our PartitionUpdate backing array, so it's good enough.
 +        return statements.size();
      }
  
      /**
@@@ -286,9 -271,9 +286,9 @@@
              }
              else if (logger.isWarnEnabled())
              {
 -                logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "");
 +                logger.warn(format, tableNames, size, warnThreshold, size - warnThreshold, "");
              }
-             ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {tableNames, size, warnThreshold, size - warnThreshold, ""}).getMessage());
 -            ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[]{ ksCfPairs, size, warnThreshold, size - warnThreshold, "" }).getMessage());
++            ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[] {tableNames, size, warnThreshold, size - warnThreshold, ""}).getMessage());
          }
      }
  
@@@ -311,17 -298,21 +311,16 @@@
              }
  
              // CASSANDRA-9303: If we only have local mutations we do not warn
 -            if (localMutationsOnly)
 +            if (localPartitionsOnly)
                  return;
  
 -            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning,
 -                             keySet.size(), keySet.size() == 1 ? "" : "s",
 -                             ksCfPairs.size() == 1 ? "" : "s", ksCfPairs);
  
 -            ClientWarn.instance.warn(MessageFormatter.arrayFormat(unloggedBatchWarning,
 -                                                                  new Object[]{
 -                                                                              keySet.size(),
 -                                                                              keySet.size() == 1 ? "" : "s",
 -                                                                              ksCfPairs.size() == 1 ? "" : "s",
 -                                                                              ksCfPairs
 -                                                                  }).getMessage());
 +            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING,
 +                             keySet.size(), keySet.size() == 1 ? "" : "s",
 +                             tableNames.size() == 1 ? "" : "s", tableNames);
  
-             ClientWarn.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
++            ClientWarn.instance.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s",
 +                                                    tableNames.size() == 1 ? "" : "s", tableNames}).getMessage());
- 
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 4017ce6,0000000..5af4887
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@@ -1,330 -1,0 +1,330 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3.statements;
 +
 +import java.util.*;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.auth.Permission;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.config.ViewDefinition;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 +import org.apache.cassandra.cql3.selection.RawSelector;
 +import org.apache.cassandra.cql3.selection.Selectable;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.ReversedType;
 +import org.apache.cassandra.db.view.View;
 +import org.apache.cassandra.exceptions.AlreadyExistsException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.exceptions.RequestValidationException;
 +import org.apache.cassandra.exceptions.UnauthorizedException;
 +import org.apache.cassandra.schema.TableParams;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.service.MigrationManager;
 +import org.apache.cassandra.thrift.ThriftValidation;
 +import org.apache.cassandra.transport.Event;
 +
 +public class CreateViewStatement extends SchemaAlteringStatement
 +{
 +    private final CFName baseName;
 +    private final List<RawSelector> selectClause;
 +    private final WhereClause whereClause;
 +    private final List<ColumnIdentifier.Raw> partitionKeys;
 +    private final List<ColumnIdentifier.Raw> clusteringKeys;
 +    public final CFProperties properties = new CFProperties();
 +    private final boolean ifNotExists;
 +
 +    public CreateViewStatement(CFName viewName,
 +                               CFName baseName,
 +                               List<RawSelector> selectClause,
 +                               WhereClause whereClause,
 +                               List<ColumnIdentifier.Raw> partitionKeys,
 +                               List<ColumnIdentifier.Raw> clusteringKeys,
 +                               boolean ifNotExists)
 +    {
 +        super(viewName);
 +        this.baseName = baseName;
 +        this.selectClause = selectClause;
 +        this.whereClause = whereClause;
 +        this.partitionKeys = partitionKeys;
 +        this.clusteringKeys = clusteringKeys;
 +        this.ifNotExists = ifNotExists;
 +    }
 +
 +
 +    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
 +    {
 +        if (!baseName.hasKeyspace())
 +            baseName.setKeyspace(keyspace(), true);
 +        state.hasColumnFamilyAccess(keyspace(), baseName.getColumnFamily(), Permission.ALTER);
 +    }
 +
 +    public void validate(ClientState state) throws RequestValidationException
 +    {
 +        // We do validation in announceMigration to reduce doubling up of work
 +    }
 +
 +    private interface AddColumn {
 +        void add(ColumnIdentifier identifier, AbstractType<?> type);
 +    }
 +
 +    private void add(CFMetaData baseCfm, Iterable<ColumnIdentifier> columns, AddColumn adder)
 +    {
 +        for (ColumnIdentifier column : columns)
 +        {
 +            AbstractType<?> type = baseCfm.getColumnDefinition(column).type;
 +            if (properties.definedOrdering.containsKey(column))
 +            {
 +                boolean desc = properties.definedOrdering.get(column);
 +                if (!desc && type.isReversed())
 +                {
 +                    type = ((ReversedType)type).baseType;
 +                }
 +                else if (desc && !type.isReversed())
 +                {
 +                    type = ReversedType.getInstance(type);
 +                }
 +            }
 +            adder.add(column, type);
 +        }
 +    }
 +
 +    public Event.SchemaChange announceMigration(boolean isLocalOnly) throws RequestValidationException
 +    {
 +        // We need to make sure that:
 +        //  - primary key includes all columns in base table's primary key
 +        //  - make sure that the select statement does not have anything other than columns
 +        //    and their names match the base table's names
 +        //  - make sure that primary key does not include any collections
 +        //  - make sure there is no where clause in the select statement
 +        //  - make sure there is not currently a table or view
 +        //  - make sure baseTable gcGraceSeconds > 0
 +
 +        properties.validate();
 +
 +        if (properties.useCompactStorage)
 +            throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view");
 +
 +        // We enforce the keyspace because if the RF is different, the logic to wait for a
 +        // specific replica would break
 +        if (!baseName.getKeyspace().equals(keyspace()))
 +            throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace");
 +
 +        CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily());
 +
 +        if (cfm.isCounter())
 +            throw new InvalidRequestException("Materialized views are not supported on counter tables");
 +        if (cfm.isView())
 +            throw new InvalidRequestException("Materialized views cannot be created against other materialized views");
 +
 +        if (cfm.params.gcGraceSeconds == 0)
 +        {
 +            throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " +
 +                                                            "'%s' with gc_grace_seconds of 0, since this value is " +
 +                                                            "used to TTL undelivered updates. Setting gc_grace_seconds" +
 +                                                            " too low might cause undelivered updates to expire " +
 +                                                            "before being replayed.", cfName.getColumnFamily(),
 +                                                            baseName.getColumnFamily()));
 +        }
 +
 +        Set<ColumnIdentifier> included = new HashSet<>();
 +        for (RawSelector selector : selectClause)
 +        {
 +            Selectable.Raw selectable = selector.selectable;
 +            if (selectable instanceof Selectable.WithFieldSelection.Raw)
 +                throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view");
 +            if (selectable instanceof Selectable.WithFunction.Raw)
 +                throw new InvalidRequestException("Cannot use function when defining a materialized view");
 +            if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
 +                throw new InvalidRequestException("Cannot use function when defining a materialized view");
 +            ColumnIdentifier identifier = (ColumnIdentifier) selectable.prepare(cfm);
 +            if (selector.alias != null)
 +                throw new InvalidRequestException(String.format("Cannot alias column '%s' as '%s' when defining a materialized view", identifier.toString(), selector.alias.toString()));
 +
 +            ColumnDefinition cdef = cfm.getColumnDefinition(identifier);
 +
 +            if (cdef == null)
 +                throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
 +
 +            if (cdef.isStatic())
-                 ClientWarn.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier));
++                ClientWarn.instance.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier));
 +            else
 +                included.add(identifier);
 +        }
 +
 +        Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>();
 +        for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys))
 +        {
 +            if (!targetPrimaryKeys.add(identifier))
 +                throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier);
 +
 +            ColumnDefinition cdef = cfm.getColumnDefinition(identifier.prepare(cfm));
 +
 +            if (cdef == null)
 +                throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
 +
 +            if (cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell())
 +                throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));
 +
 +            if (cdef.isStatic())
 +                throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier));
 +        }
 +
 +        // build the select statement
 +        Map<ColumnIdentifier.Raw, Boolean> orderings = Collections.emptyMap();
 +        SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, false, true, false);
 +        SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null);
 +
 +        ClientState state = ClientState.forInternalCalls();
 +        state.setKeyspace(keyspace());
 +
 +        rawSelect.prepareKeyspace(state);
 +        rawSelect.setBoundVariables(getBoundVariables());
 +
 +        ParsedStatement.Prepared prepared = rawSelect.prepare(true);
 +        SelectStatement select = (SelectStatement) prepared.statement;
 +        StatementRestrictions restrictions = select.getRestrictions();
 +
 +        if (!prepared.boundNames.isEmpty())
 +            throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements");
 +
 +        if (!restrictions.nonPKRestrictedColumns(false).isEmpty())
 +        {
 +            throw new InvalidRequestException(String.format(
 +                    "Non-primary key columns cannot be restricted in the SELECT statement used for materialized view " +
 +                    "creation (got restrictions on: %s)",
 +                    restrictions.nonPKRestrictedColumns(false).stream().map(def -> def.name.toString()).collect(Collectors.joining(", "))));
 +        }
 +
 +        String whereClauseText = View.relationsToWhereClause(whereClause.relations);
 +
 +        Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
 +        for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns()))
 +            basePrimaryKeyCols.add(definition.name);
 +
 +        List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>();
 +        List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>();
 +
 +        // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used
 +        boolean hasNonPKColumn = false;
 +        for (ColumnIdentifier.Raw raw : partitionKeys)
 +            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, restrictions);
 +
 +        for (ColumnIdentifier.Raw raw : clusteringKeys)
 +            hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, restrictions);
 +
 +        // We need to include all of the primary key columns from the base table in order to make sure that we do not
 +        // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
 +        // the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
 +        // used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
 +        // that they include all of the columns. We provide them with a list of all of the columns left to include.
 +        boolean missingClusteringColumns = false;
 +        StringBuilder columnNames = new StringBuilder();
 +        List<ColumnIdentifier> includedColumns = new ArrayList<>();
 +        for (ColumnDefinition def : cfm.allColumns())
 +        {
 +            ColumnIdentifier identifier = def.name;
 +
 +            if ((included.isEmpty() || included.contains(identifier))
 +                && !targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier)
 +                && !def.isStatic())
 +            {
 +                includedColumns.add(identifier);
 +            }
 +            if (!def.isPrimaryKeyColumn()) continue;
 +
 +            if (!targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier))
 +            {
 +                if (missingClusteringColumns)
 +                    columnNames.append(',');
 +                else
 +                    missingClusteringColumns = true;
 +                columnNames.append(identifier);
 +            }
 +        }
 +        if (missingClusteringColumns)
 +            throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)",
 +                                                            columnFamily(), baseName.getColumnFamily(), columnNames.toString()));
 +
 +        if (targetPartitionKeys.isEmpty())
 +            throw new InvalidRequestException("Must select at least a column for a Materialized View");
 +
 +        if (targetClusteringColumns.isEmpty())
 +            throw new InvalidRequestException("No columns are defined for Materialized View other than primary key");
 +
 +        CFMetaData.Builder cfmBuilder = CFMetaData.Builder.createView(keyspace(), columnFamily());
 +        add(cfm, targetPartitionKeys, cfmBuilder::addPartitionKey);
 +        add(cfm, targetClusteringColumns, cfmBuilder::addClusteringColumn);
 +        add(cfm, includedColumns, cfmBuilder::addRegularColumn);
 +        cfmBuilder.withId(properties.properties.getId());
 +        TableParams params = properties.properties.asNewTableParams();
 +        CFMetaData viewCfm = cfmBuilder.build().params(params);
 +        ViewDefinition definition = new ViewDefinition(keyspace(),
 +                                                       columnFamily(),
 +                                                       Schema.instance.getId(keyspace(), baseName.getColumnFamily()),
 +                                                       baseName.getColumnFamily(),
 +                                                       included.isEmpty(),
 +                                                       rawSelect,
 +                                                       whereClauseText,
 +                                                       viewCfm);
 +
 +        try
 +        {
 +            MigrationManager.announceNewView(definition, isLocalOnly);
 +            return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
 +        }
 +        catch (AlreadyExistsException e)
 +        {
 +            if (ifNotExists)
 +                return null;
 +            throw e;
 +        }
 +    }
 +
 +    private static boolean getColumnIdentifier(CFMetaData cfm,
 +                                               Set<ColumnIdentifier> basePK,
 +                                               boolean hasNonPKColumn,
 +                                               ColumnIdentifier.Raw raw,
 +                                               List<ColumnIdentifier> columns,
 +                                               StatementRestrictions restrictions)
 +    {
 +        ColumnIdentifier identifier = raw.prepare(cfm);
 +        ColumnDefinition def = cfm.getColumnDefinition(identifier);
 +
 +        boolean isPk = basePK.contains(identifier);
 +        if (!isPk && hasNonPKColumn)
 +            throw new InvalidRequestException(String.format("Cannot include more than one non-primary key column '%s' in materialized view partition key", identifier));
 +
 +        // We don't need to include the "IS NOT NULL" filter on a non-composite partition key
 +        // because we will never allow a single partition key to be NULL
 +        boolean isSinglePartitionKey = cfm.getColumnDefinition(identifier).isPartitionKey()
 +                                       && cfm.partitionKeyColumns().size() == 1;
 +        if (!isSinglePartitionKey && !restrictions.isRestricted(def))
 +            throw new InvalidRequestException(String.format("Primary key column '%s' is required to be filtered by 'IS NOT NULL'", identifier));
 +
 +        columns.add(identifier);
 +        return !isPk;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a9bb121,5cfa94b..904adca
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -359,21 -256,19 +359,21 @@@ public class SelectStatement implement
          else if (restrictions.keyIsInRelation())
          {
              logger.warn("Aggregation query used on multiple partition keys (IN restriction)");
-             ClientWarn.warn("Aggregation query used on multiple partition keys (IN restriction)");
+             ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)");
          }
  
 -        Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);
 +        Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
          while (!pager.isExhausted())
          {
 -            for (Row row : pager.fetchPage(pageSize))
 +            try (PartitionIterator iter = pager.fetchPage(pageSize))
              {
 -                // Not columns match the query, skip
 -                if (row.cf == null)
 -                    continue;
 -
 -                processColumnFamily(row.key.getKey(), row.cf, options, now, result);
 +                while (iter.hasNext())
 +                {
 +                    try (RowIterator partition = iter.next())
 +                    {
 +                        processPartition(partition, options, result, nowInSec);
 +                    }
 +                }
              }
          }
          return new ResultMessage.Rows(result.build(options.getProtocolVersion()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 3f0695c,cd86336..668a189
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -232,713 -95,55 +232,713 @@@ public abstract class ReadCommand imple
          return this;
      }
  
 -    public String getColumnFamilyName()
 +    /**
 +     * Sets the digest version, for when digest for that command is requested.
 +     * <p>
 +     * Note that we allow setting this independently of setting the command as a digest query as
 +     * this allows us to use the command as a carrier of the digest version even if we only call
 +     * setIsDigestQuery on some copy of it.
 +     *
 +     * @param digestVersion the version for the digest is this command is used for digest query..
 +     * @return this read command.
 +     */
 +    public ReadCommand setDigestVersion(int digestVersion)
      {
 -        return cfName;
 +        this.digestVersion = digestVersion;
 +        return this;
      }
  
 +    /**
 +     * Whether this query is for thrift or not.
 +     *
 +     * @return whether this query is for thrift.
 +     */
 +    public boolean isForThrift()
 +    {
 +        return isForThrift;
 +    }
 +
 +    /**
 +     * The clustering index filter this command to use for the provided key.
 +     * <p>
 +     * Note that that method should only be called on a key actually queried by this command
 +     * and in practice, this will almost always return the same filter, but for the sake of
 +     * paging, the filter on the first key of a range command might be slightly different.
 +     *
 +     * @param key a partition key queried by this command.
 +     *
 +     * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
 +     */
 +    public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
 +
 +    /**
 +     * Returns a copy of this command.
 +     *
 +     * @return a copy of this command.
 +     */
      public abstract ReadCommand copy();
  
 -    public abstract Row getRow(Keyspace keyspace);
 +    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 +
 +    protected abstract int oldestUnrepairedTombstone();
 +
 +    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
 +    {
 +        return isDigestQuery()
 +             ? ReadResponse.createDigestResponse(iterator, digestVersion)
 +             : ReadResponse.createDataResponse(iterator, selection);
 +    }
 +
 +    public long indexSerializedSize(int version)
 +    {
 +        if (index.isPresent())
 +            return IndexMetadata.serializer.serializedSize(index.get(), version);
 +        else
 +            return 0;
 +    }
 +
 +    public Index getIndex(ColumnFamilyStore cfs)
 +    {
 +        // if we've already consulted the index manager, and it returned a valid index
 +        // the result should be cached here.
 +        if(index.isPresent())
 +            return cfs.indexManager.getIndex(index.get());
 +
 +        // if no cached index is present, but we've already consulted the index manager
 +        // then no registered index is suitable for this command, so just return null.
 +        if (indexManagerQueried)
 +            return null;
 +
 +        // do the lookup, set the flag to indicate so and cache the result if not null
 +        Index selected = cfs.indexManager.getBestIndexFor(this);
 +        indexManagerQueried = true;
  
 -    public abstract IDiskAtomFilter filter();
 +        if (selected == null)
 +            return null;
  
 -    public String getKeyspace()
 +        index = Optional.of(selected.getIndexMetadata());
 +        return selected;
 +    }
 +
 +    /**
 +     * Executes this command on the local host.
 +     *
 +     * @param orderGroup the operation group spanning this command
 +     *
 +     * @return an iterator over the result of executing this command locally.
 +     */
 +    @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
 +                                  // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
 +    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
      {
 -        return ksName;
 +        long startTimeNanos = System.nanoTime();
 +
 +        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
 +        Index index = getIndex(cfs);
 +
 +        Index.Searcher searcher = null;
 +        if (index != null)
 +        {
 +            if (!cfs.indexManager.isIndexQueryable(index))
 +                throw new IndexNotAvailableException(index);
 +
 +            searcher = index.searcherFor(this);
 +            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
 +        }
 +
 +        UnfilteredPartitionIterator resultIterator = searcher == null
 +                                         ? queryStorage(cfs, orderGroup)
 +                                         : searcher.search(orderGroup);
 +
 +        try
 +        {
 +            resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
 +
 +            // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
 +            // no point in checking it again.
 +            RowFilter updatedFilter = searcher == null
 +                                    ? rowFilter()
 +                                    : index.getPostIndexQueryFilter(rowFilter());
 +
 +            // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
 +            // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
 +            // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
 +            // processing we do on it).
 +            return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            resultIterator.close();
 +            throw e;
 +        }
      }
  
 -    // maybeGenerateRetryCommand is used to generate a retry for short reads
 -    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
 +    protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 +
 +    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
      {
 -        return null;
 +        return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
      }
  
 -    // maybeTrim removes columns from a response that is too long
 -    public Row maybeTrim(Row row)
 +    public ReadOrderGroup startOrderGroup()
      {
 -        return row;
 +        return ReadOrderGroup.forCommand(this);
      }
  
 -    public long getTimeout()
 +    /**
 +     * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
 +     * This also log warning/trow TombstoneOverwhelmingException if appropriate.
 +     */
 +    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
      {
 -        return DatabaseDescriptor.getReadRpcTimeout();
 +        class MetricRecording extends Transformation<UnfilteredRowIterator>
 +        {
 +            private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
 +            private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
 +
 +            private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 +
 +            private int liveRows = 0;
 +            private int tombstones = 0;
 +
 +            private DecoratedKey currentKey;
 +
 +            @Override
 +            public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
 +            {
 +                currentKey = iter.partitionKey();
 +                return Transformation.apply(iter, this);
 +            }
 +
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                return applyToRow(row);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (row.hasLiveData(ReadCommand.this.nowInSec()))
 +                    ++liveRows;
 +
 +                for (Cell cell : row.cells())
 +                {
 +                    if (!cell.isLive(ReadCommand.this.nowInSec()))
 +                        countTombstone(row.clustering());
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                countTombstone(marker.clustering());
 +                return marker;
 +            }
 +
 +            private void countTombstone(ClusteringPrefix clustering)
 +            {
 +                ++tombstones;
 +                if (tombstones > failureThreshold && respectTombstoneThresholds)
 +                {
 +                    String query = ReadCommand.this.toCQLString();
 +                    Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
 +                    throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
 +                }
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                recordLatency(metric, System.nanoTime() - startTimeNanos);
 +
 +                metric.tombstoneScannedHistogram.update(tombstones);
 +                metric.liveScannedHistogram.update(liveRows);
 +
 +                boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
 +                if (warnTombstones)
 +                {
 +                    String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
-                     ClientWarn.warn(msg);
++                    ClientWarn.instance.warn(msg);
 +                    logger.warn(msg);
 +                }
 +
 +                Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
 +            }
 +        };
 +
 +        return Transformation.apply(iter, new MetricRecording());
      }
 -}
  
 -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 -{
 -    public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +    /**
 +     * Creates a message for this command.
 +     */
 +    public abstract MessageOut<ReadCommand> createMessage(int version);
 +
 +    protected abstract void appendCQLWhereClause(StringBuilder sb);
 +
 +    // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
 +    // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
 +    // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
 +    protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
 +    {
 +        final boolean isForThrift = iterator.isForThrift();
 +        class WithoutPurgeableTombstones extends PurgeFunction
 +        {
 +            public WithoutPurgeableTombstones()
 +            {
 +                super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
 +            }
 +
 +            protected long getMaxPurgeableTimestamp()
 +            {
 +                return Long.MAX_VALUE;
 +            }
 +        }
 +        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
 +    }
 +
 +    /**
 +     * Recreate the CQL string corresponding to this query.
 +     * <p>
 +     * Note that in general the returned string will not be exactly the original user string, first
 +     * because there isn't always a single syntax for a given query,  but also because we don't have
 +     * all the information needed (we know the non-PK columns queried but not the PK ones as internally
 +     * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
 +     * debugging purpose which is what this is for.
 +     */
 +    public String toCQLString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("SELECT ").append(columnFilter());
 +        sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
 +        appendCQLWhereClause(sb);
 +
 +        if (limits() != DataLimits.NONE)
 +            sb.append(' ').append(limits());
 +        return sb.toString();
 +    }
 +
 +    private static class Serializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        private static int digestFlag(boolean isDigest)
 +        {
 +            return isDigest ? 0x01 : 0;
 +        }
 +
 +        private static boolean isDigest(int flags)
 +        {
 +            return (flags & 0x01) != 0;
 +        }
 +
 +        private static int thriftFlag(boolean isForThrift)
 +        {
 +            return isForThrift ? 0x02 : 0;
 +        }
 +
 +        private static boolean isForThrift(int flags)
 +        {
 +            return (flags & 0x02) != 0;
 +        }
 +
 +        private static int indexFlag(boolean hasIndex)
 +        {
 +            return hasIndex ? 0x04 : 0;
 +        }
 +
 +        private static boolean hasIndex(int flags)
 +        {
 +            return (flags & 0x04) != 0;
 +        }
 +
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            out.writeByte(command.kind.ordinal());
 +            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
 +            if (command.isDigestQuery())
 +                out.writeUnsignedVInt(command.digestVersion());
 +            CFMetaData.serializer.serialize(command.metadata(), out, version);
 +            out.writeInt(command.nowInSec());
 +            ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
 +            RowFilter.serializer.serialize(command.rowFilter(), out, version);
 +            DataLimits.serializer.serialize(command.limits(), out, version);
 +            if (command.index.isPresent())
 +                IndexMetadata.serializer.serialize(command.index.get(), out, version);
 +
 +            command.serializeSelection(out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                return legacyReadCommandSerializer.deserialize(in, version);
 +
 +            Kind kind = Kind.values()[in.readByte()];
 +            int flags = in.readByte();
 +            boolean isDigest = isDigest(flags);
 +            boolean isForThrift = isForThrift(flags);
 +            boolean hasIndex = hasIndex(flags);
 +            int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
 +            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
 +            int nowInSec = in.readInt();
 +            ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
 +            RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, version);
 +            Optional<IndexMetadata> index = hasIndex
 +                                            ? deserializeIndexMetadata(in, version, metadata)
 +                                            : Optional.empty();
 +
 +            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
 +        }
 +
 +        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
 +        {
 +            try
 +            {
 +                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
 +            }
 +            catch (UnknownIndexException e)
 +            {
 +                String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
 +                                               "If an index was just created, this is likely due to the schema not " +
 +                                               "being fully propagated. Local read will proceed without using the " +
 +                                               "index. Please wait for schema agreement after index creation.",
 +                                               cfm.ksName, cfm.cfName, e.indexId.toString());
 +                logger.info(message);
 +                return Optional.empty();
 +            }
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            return 2 // kind + flags
 +                 + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
 +                 + CFMetaData.serializer.serializedSize(command.metadata(), version)
 +                 + TypeSizes.sizeof(command.nowInSec())
 +                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
 +                 + RowFilter.serializer.serializedSize(command.rowFilter(), version)
 +                 + DataLimits.serializer.serializedSize(command.limits(), version)
 +                 + command.selectionSerializedSize(version)
 +                 + command.indexSerializedSize(version);
 +        }
 +    }
 +
 +    // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0
 +    // compatibility
 +    private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                legacyRangeSliceCommandSerializer.serialize(command, out, version);
 +            else
 +                serializer.serialize(command, out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.deserialize(in, version)
 +                 : serializer.deserialize(in, version);
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
 +                 : serializer.serializedSize(command, version);
 +        }
 +    }
 +
 +    private enum LegacyType
 +    {
 +        GET_BY_NAMES((byte)1),
 +        GET_SLICES((byte)2);
 +
 +        public final byte serializedValue;
 +
 +        LegacyType(byte b)
 +        {
 +            this.serializedValue = b;
 +        }
 +
 +        public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
 +        {
 +            return kind == ClusteringIndexFilter.Kind.SLICE
 +                   ? GET_SLICES
 +                   : GET_BY_NAMES;
 +        }
 +
 +        public static LegacyType fromSerializedValue(byte b)
 +        {
 +            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +        }
 +    }
 +
 +    /**
 +     * Serializer for pre-3.0 RangeSliceCommands.
 +     */
 +    private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
      {
 -        out.writeByte(command.commandType.serializedValue);
 -        switch (command.commandType)
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            assert !rangeCommand.dataRange().isPaging();
 +
 +            // convert pre-3.0 incompatible names filters to slice filters
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            out.writeUTF(metadata.ksName);
 +            out.writeUTF(metadata.cfName);
 +            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from seconds to millis
 +
 +            // begin DiskAtomFilterSerializer.serialize()
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                out.writeByte(1);  // 0 for slices, 1 for names
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
 +            }
 +            else
 +            {
 +                out.writeByte(0);  // 0 for slices, 1 for names
 +
 +                // slice filter serialization
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
 +
 +                out.writeBoolean(filter.isReversed());
 +
 +                // limit
 +                DataLimits.Kind kind = rangeCommand.limits().kind();
 +                boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
 +                if (isDistinct)
 +                    out.writeInt(1);
 +                else
 +                    out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
 +
 +                int compositesToGroup;
 +                boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                if (kind == DataLimits.Kind.THRIFT_LIMIT)
 +                    compositesToGroup = -1;
 +                else if (isDistinct && !selectsStatics)
 +                    compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
 +                else
 +                    compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
 +
 +                out.writeInt(compositesToGroup);
 +            }
 +
 +            serializeRowFilter(out, rangeCommand.rowFilter());
 +            AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
 +
 +            // maxResults
 +            out.writeInt(rangeCommand.limits().count());
 +
 +            // countCQL3Rows
 +            if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1)  // if for Thrift or DISTINCT
 +                out.writeBoolean(false);
 +            else
 +                out.writeBoolean(true);
 +
 +            // isPaging
 +            out.writeBoolean(false);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            String keyspace = in.readUTF();
 +            String columnFamily = in.readUTF();
 +
 +            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 +            if (metadata == null)
 +            {
 +                String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
 +                throw new UnknownColumnFamilyException(message, null);
 +            }
 +
 +            int nowInSec = (int) (in.readLong() / 1000);  // convert from millis to seconds
 +
 +            ClusteringIndexFilter filter;
 +            ColumnFilter selection;
 +            int compositesToGroup = 0;
 +            int perPartitionLimit = -1;
 +            byte readType = in.readByte();  // 0 for slices, 1 for names
 +            if (readType == 1)
 +            {
 +                Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
 +                selection = selectionAndFilter.left;
 +                filter = selectionAndFilter.right;
 +            }
 +            else
 +            {
 +                Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
 +                filter = p.left;
 +                perPartitionLimit = in.readInt();
 +                compositesToGroup = in.readInt();
 +                selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
 +            }
 +
 +            RowFilter rowFilter = deserializeRowFilter(in, metadata);
 +
 +            AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
 +            int maxResults = in.readInt();
 +
 +            in.readBoolean();  // countCQL3Rows (not needed)
 +            in.readBoolean();  // isPaging (not needed)
 +
 +            boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
 +            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
 +            DataLimits limits;
 +            if (isDistinct)
 +                limits = DataLimits.distinctLimits(maxResults);
 +            else if (compositesToGroup == -1)
 +                limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
 +            else
 +                limits = DataLimits.cqlLimits(maxResults);
 +
 +            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
 +        }
 +
 +        static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
 +        {
 +            ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
 +            out.writeInt(indexExpressions.size());
 +            for (RowFilter.Expression expression : indexExpressions)
 +            {
 +                ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
 +                expression.operator().writeTo(out);
 +                ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
 +            }
 +        }
 +
 +        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
 +        {
 +            int numRowFilters = in.readInt();
 +            if (numRowFilters == 0)
 +                return RowFilter.NONE;
 +
 +            RowFilter rowFilter = RowFilter.create(numRowFilters);
 +            for (int i = 0; i < numRowFilters; i++)
 +            {
 +                ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
 +                ColumnDefinition column = metadata.getColumnDefinition(columnName);
 +                Operator op = Operator.readFrom(in);
 +                ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
 +                rowFilter.add(column, op, indexValue);
 +            }
 +            return rowFilter;
 +        }
 +
 +        static long serializedRowFilterSize(RowFilter rowFilter)
 +        {
 +            long size = TypeSizes.sizeof(0);  // rowFilterCount
 +            for (RowFilter.Expression expression : rowFilter)
 +            {
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                size += TypeSizes.sizeof(0);  // operator int value
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +            }
 +            return size;
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version < MessagingService.VERSION_30;
 +            assert command.kind == Kind.PARTITION_RANGE;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            long size = TypeSizes.sizeof(metadata.ksName);
 +            size += TypeSizes.sizeof(metadata.cfName);
 +            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
 +
 +            size += 1;  // single byte flag: 0 for slices, 1 for names
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
 +            }
 +            else
 +            {
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
 +                size += TypeSizes.sizeof(filter.isReversed());
 +                size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
 +                size += TypeSizes.sizeof(0); // compositesToGroup
 +            }
 +
 +            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
 +            {
 +                size += TypeSizes.sizeof(0);
 +            }
 +            else
 +            {
 +                ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
 +                size += TypeSizes.sizeof(indexExpressions.size());
 +                for (RowFilter.Expression expression : indexExpressions)
 +                {
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                    size += TypeSizes.sizeof(expression.operator().ordinal());
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +                }
 +            }
 +
 +            size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
 +            size += TypeSizes.sizeof(rangeCommand.limits().count());
 +            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
 +            return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
 +        }
 +
 +        static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
          {
 -            case GET_BY_NAMES:
 -                SliceByNamesReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            case GET_SLICES:
 -                SliceFromReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            default:
 -                throw new AssertionError();
 +            if (!command.dataRange().isNamesQuery())
 +                return command;
 +
 +            CFMetaData metadata = command.metadata();
 +            if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
 +                return command;
 +
 +            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
 +            ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
 +            DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
 +            return new PartitionRangeReadCommand(
 +                    command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
 +                    command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
 +        }
 +
 +        static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
 +        {
 +            // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
 +            // In that case, we'll basically be querying the first row of the partition, but we must make sure we include
 +            // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise.
 +            if (compositesToGroup == -2)
 +                return ColumnFilter.all(metadata);
 +
 +            // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
 +            PartitionColumns columns = selectsStatics
 +                                     ? metadata.partitionColumns()
 +                                     : metadata.partitionColumns().withoutStatics();
 +            return ColumnFilter.selectionBuilder().addAll(columns).build();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index ccc900b,459923b..d416dca
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -39,8 -39,12 +39,9 @@@ import com.google.common.collect.Lists
  import com.google.common.collect.Sets;
  
  import org.cliffc.high_scale_lib.NonBlockingHashMap;
 -
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 -
 -import org.apache.cassandra.concurrent.ExecutorLocal;
+ import org.apache.cassandra.concurrent.ExecutorLocals;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;