You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/22 22:11:23 UTC

[8/8] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa60cde3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa60cde3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa60cde3

Branch: refs/heads/cassandra-3.0
Commit: aa60cde3122a2b512ba4283b2bfd2deaff008004
Parents: 96eb58a 4a849ef
Author: blerer <be...@datastax.com>
Authored: Tue Sep 22 22:04:30 2015 +0200
Committer: blerer <be...@datastax.com>
Committed: Tue Sep 22 22:10:30 2015 +0200

----------------------------------------------------------------------
 NEWS.txt                                        | 16 +++---
 conf/logback.xml                                | 55 ++++++++++++++++---
 .../cassandra/auth/CassandraAuthorizer.java     |  2 +-
 .../cassandra/auth/CassandraRoleManager.java    |  4 +-
 .../cassandra/auth/PasswordAuthenticator.java   |  4 +-
 .../apache/cassandra/auth/PermissionsCache.java |  2 +-
 .../org/apache/cassandra/auth/RolesCache.java   |  2 +-
 .../cassandra/batchlog/BatchlogManager.java     | 10 ++--
 .../batchlog/LegacyBatchlogMigrator.java        |  8 +--
 .../apache/cassandra/cache/AutoSavingCache.java |  8 +--
 .../cassandra/cache/SerializingCache.java       |  2 +-
 .../org/apache/cassandra/client/RingCache.java  |  2 +-
 .../DebuggableScheduledThreadPoolExecutor.java  |  2 +-
 .../DebuggableThreadPoolExecutor.java           |  2 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  6 +--
 .../cql3/functions/JavaBasedUDFunction.java     |  4 +-
 .../cassandra/cql3/functions/UDFunction.java    |  2 +-
 .../cql3/statements/CreateIndexStatement.java   |  2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 44 +++++++--------
 .../apache/cassandra/db/ConsistencyLevel.java   |  6 +--
 .../db/CounterMutationVerbHandler.java          |  2 +-
 .../db/DefinitionsUpdateVerbHandler.java        |  2 +-
 .../org/apache/cassandra/db/Directories.java    |  8 +--
 src/java/org/apache/cassandra/db/Keyspace.java  |  8 +--
 src/java/org/apache/cassandra/db/Memtable.java  | 16 +++---
 .../db/MigrationRequestVerbHandler.java         |  2 +-
 .../cassandra/db/SchemaCheckVerbHandler.java    |  2 +-
 .../cassandra/db/SizeEstimatesRecorder.java     |  6 +--
 .../org/apache/cassandra/db/SystemKeyspace.java |  4 +-
 .../cassandra/db/commitlog/CommitLog.java       |  6 +--
 .../db/commitlog/CommitLogArchiver.java         |  4 +-
 .../db/commitlog/CommitLogReplayer.java         | 22 ++++----
 .../db/commitlog/CommitLogSegmentManager.java   | 14 ++---
 .../db/compaction/CompactionController.java     |  4 +-
 .../db/compaction/CompactionManager.java        | 30 +++++------
 .../compaction/CompactionStrategyManager.java   |  2 +-
 .../cassandra/db/compaction/CompactionTask.java |  8 +--
 .../DateTieredCompactionStrategy.java           | 10 ++--
 .../compaction/LeveledCompactionStrategy.java   |  4 +-
 .../db/compaction/LeveledManifest.java          | 32 +++++------
 .../SizeTieredCompactionStrategy.java           |  4 +-
 .../SplittingSizeTieredCompactionWriter.java    |  4 +-
 .../db/lifecycle/LifecycleTransaction.java      | 20 +++----
 .../cassandra/db/lifecycle/LogTransaction.java  | 16 +++---
 .../apache/cassandra/db/lifecycle/Tracker.java  |  8 +--
 .../org/apache/cassandra/dht/BootStrapper.java  |  4 +-
 .../org/apache/cassandra/dht/RangeStreamer.java | 12 ++---
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |  4 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  6 +--
 ...mitedLocalNodeFirstLocalBalancingPolicy.java | 14 ++---
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  2 +-
 .../apache/cassandra/hints/HintVerbHandler.java |  2 +-
 .../cassandra/hints/HintsDispatchExecutor.java  |  2 +-
 .../cassandra/index/SecondaryIndexManager.java  |  6 +--
 .../index/internal/CassandraIndex.java          |  6 +--
 .../io/sstable/IndexSummaryManager.java         | 10 ++--
 .../apache/cassandra/io/sstable/SSTable.java    |  2 +-
 .../io/sstable/format/SSTableReader.java        | 24 ++++-----
 .../io/sstable/metadata/MetadataSerializer.java |  8 +--
 .../org/apache/cassandra/io/util/FileUtils.java |  8 +--
 .../locator/AbstractReplicationStrategy.java    |  2 +-
 .../locator/NetworkTopologyStrategy.java        |  2 +-
 .../cassandra/locator/PropertyFileSnitch.java   |  6 +--
 .../locator/ReconnectableSnitchHelper.java      |  2 +-
 .../apache/cassandra/locator/TokenMetadata.java |  8 +--
 .../net/IncomingStreamingConnection.java        |  4 +-
 .../cassandra/net/IncomingTcpConnection.java    | 10 ++--
 .../cassandra/net/MessageDeliveryTask.java      |  2 +-
 .../apache/cassandra/net/MessagingService.java  | 20 +++----
 .../cassandra/net/OutboundTcpConnection.java    | 12 ++---
 .../cassandra/net/ResponseVerbHandler.java      |  2 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |  2 +-
 .../cassandra/service/DigestResolver.java       |  8 +--
 .../apache/cassandra/service/GCInspector.java   |  4 +-
 .../cassandra/service/LoadBroadcaster.java      |  4 +-
 .../apache/cassandra/service/ReadCallback.java  |  4 +-
 .../apache/cassandra/service/StorageProxy.java  | 14 ++---
 .../cassandra/thrift/CassandraServer.java       | 56 ++++++++++----------
 .../thrift/CustomTThreadPoolServer.java         |  4 +-
 .../cassandra/thrift/ThriftValidation.java      |  4 +-
 .../org/apache/cassandra/tracing/Tracing.java   |  2 +-
 .../org/apache/cassandra/transport/Message.java |  6 +--
 .../cassandra/triggers/CustomClassLoader.java   |  2 +-
 .../org/apache/cassandra/utils/CLibrary.java    |  2 +-
 .../org/apache/cassandra/utils/Mx4jTool.java    |  4 +-
 .../apache/cassandra/utils/OutputHandler.java   |  2 +-
 .../org/apache/cassandra/utils/TopKSampler.java |  2 +-
 87 files changed, 383 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 67398cf,6bd0a77..e4b8663
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -122,6 -27,16 +122,12 @@@ Changed Default
     - commitlog_total_space_in_mb will use the smaller of 8192, and 1/4
       of the total space of the commitlog volume. (Before: always used
       8192)
 -   - Incremental repair is on by default since 2.2.0, run full repairs by
 -     providing the '-full' parameter to nodetool repair.
 -   - Parallel repairs are the default since 2.2.0, run sequential repairs
 -     by providing the '-seq' parameter to nodetool repair.
+    - The following INFO logs were reduced to DEBUG level and will now show
+      on debug.log instead of system.log:
+       - Memtable flushing actions
+       - Commit log replayed files
+       - Compacted sstables
+       - SStable opening (SSTableReader)
  
  New features
  ------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 934ebaa,0000000..8bc4c26
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@@ -1,554 -1,0 +1,554 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.batchlog;
 +
 +import java.io.IOException;
 +import java.lang.management.ManagementFactory;
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +
 +import javax.management.MBeanServer;
 +import javax.management.ObjectName;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.*;
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.UUIDType;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.WriteFailureException;
 +import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.hints.Hint;
 +import org.apache.cassandra.hints.HintsService;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.WriteResponseHandler;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +import static com.google.common.collect.Iterables.transform;
 +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 +import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
 +
 +public class BatchlogManager implements BatchlogManagerMBean
 +{
 +    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
 +    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
 +    static final int DEFAULT_PAGE_SIZE = 128;
 +
 +    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
 +    public static final BatchlogManager instance = new BatchlogManager();
 +
 +    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
 +    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
 +
 +    // Single-thread executor service for scheduling and serializing log replay.
 +    private final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
 +
 +    public void start()
 +    {
 +        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 +        try
 +        {
 +            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
 +        }
 +        catch (Exception e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
 +                                             StorageService.RING_DELAY,
 +                                             REPLAY_INTERVAL,
 +                                             TimeUnit.MILLISECONDS);
 +    }
 +
 +    public void shutdown() throws InterruptedException
 +    {
 +        batchlogTasks.shutdown();
 +        batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
 +    }
 +
 +    public static void remove(UUID id)
 +    {
 +        new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
 +                                                         UUIDType.instance.decompose(id),
 +                                                         FBUtilities.timestampMicros(),
 +                                                         FBUtilities.nowInSeconds()))
 +            .apply();
 +    }
 +
 +    public static void store(Batch batch)
 +    {
 +        store(batch, true);
 +    }
 +
 +    public static void store(Batch batch, boolean durableWrites)
 +    {
 +        RowUpdateBuilder builder =
 +            new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
 +                .clustering()
 +                .add("version", MessagingService.current_version);
 +
 +        for (ByteBuffer mutation : batch.encodedMutations)
 +            builder.addListEntry("mutations", mutation);
 +
 +        for (Mutation mutation : batch.decodedMutations)
 +        {
 +            try (DataOutputBuffer buffer = new DataOutputBuffer())
 +            {
 +                Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
 +                builder.addListEntry("mutations", buffer.buffer());
 +            }
 +            catch (IOException e)
 +            {
 +                // shouldn't happen
 +                throw new AssertionError(e);
 +            }
 +        }
 +
 +        builder.build().apply(durableWrites);
 +    }
 +
 +    @VisibleForTesting
 +    public int countAllBatches()
 +    {
 +        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
 +        UntypedResultSet results = executeInternal(query);
 +        if (results == null || results.isEmpty())
 +            return 0;
 +
 +        return (int) results.one().getLong("count");
 +    }
 +
 +    public long getTotalBatchesReplayed()
 +    {
 +        return totalBatchesReplayed;
 +    }
 +
 +    public void forceBatchlogReplay() throws Exception
 +    {
 +        startBatchlogReplay().get();
 +    }
 +
 +    public Future<?> startBatchlogReplay()
 +    {
 +        // If a replay is already in progress this request will be executed after it completes.
 +        return batchlogTasks.submit(this::replayFailedBatches);
 +    }
 +
 +    void performInitialReplay() throws InterruptedException, ExecutionException
 +    {
 +        // Invokes initial replay. Used for testing only.
 +        batchlogTasks.submit(this::replayFailedBatches).get();
 +    }
 +
 +    private void replayFailedBatches()
 +    {
-         logger.debug("Started replayFailedBatches");
++        logger.trace("Started replayFailedBatches");
 +
 +        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
 +        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
 +        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
 +        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 +
 +        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
 +        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
 +        int pageSize = calculatePageSize(store);
 +        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
 +        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
 +        // token(id) > token(lastReplayedUuid) as part of the query.
 +        String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.BATCHES);
 +        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
 +        processBatchlogEntries(batches, pageSize, rateLimiter);
 +        lastReplayedUuid = limitUuid;
-         logger.debug("Finished replayFailedBatches");
++        logger.trace("Finished replayFailedBatches");
 +    }
 +
 +    // read less rows (batches) per page if they are very large
 +    static int calculatePageSize(ColumnFamilyStore store)
 +    {
 +        double averageRowSize = store.getMeanPartitionSize();
 +        if (averageRowSize <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
 +    }
 +
 +    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
 +    {
 +        int positionInPage = 0;
 +        ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
 +
 +        Set<InetAddress> hintedNodes = new HashSet<>();
 +        Set<UUID> replayedBatches = new HashSet<>();
 +
 +        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
 +        for (UntypedResultSet.Row row : batches)
 +        {
 +            UUID id = row.getUUID("id");
 +            int version = row.getInt("version");
 +            try
 +            {
 +                ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
 +                if (batch.replay(rateLimiter, hintedNodes) > 0)
 +                {
 +                    unfinishedBatches.add(batch);
 +                }
 +                else
 +                {
 +                    remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
 +                    ++totalBatchesReplayed;
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                logger.warn("Skipped batch replay of {} due to {}", id, e);
 +                remove(id);
 +            }
 +
 +            if (++positionInPage == pageSize)
 +            {
 +                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
 +                // finish processing the page before requesting the next row.
 +                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
 +                positionInPage = 0;
 +            }
 +        }
 +
 +        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
 +
 +        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
 +        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
 +
 +        // once all generated hints are fsynced, actually delete the batches
 +        replayedBatches.forEach(BatchlogManager::remove);
 +    }
 +
 +    private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
 +    {
 +        // schedule hints for timed out deliveries
 +        for (ReplayingBatch batch : batches)
 +        {
 +            batch.finish(hintedNodes);
 +            replayedBatches.add(batch.id);
 +        }
 +
 +        totalBatchesReplayed += batches.size();
 +        batches.clear();
 +    }
 +
 +    public static long getBatchlogTimeout()
 +    {
 +        return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
 +    }
 +
 +    private static class ReplayingBatch
 +    {
 +        private final UUID id;
 +        private final long writtenAt;
 +        private final List<Mutation> mutations;
 +        private final int replayedBytes;
 +
 +        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
 +
 +        ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
 +        {
 +            this.id = id;
 +            this.writtenAt = UUIDGen.unixTimestamp(id);
 +            this.mutations = new ArrayList<>(serializedMutations.size());
 +            this.replayedBytes = addMutations(version, serializedMutations);
 +        }
 +
 +        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
 +        {
-             logger.debug("Replaying batch {}", id);
++            logger.trace("Replaying batch {}", id);
 +
 +            if (mutations.isEmpty())
 +                return 0;
 +
 +            int gcgs = gcgs(mutations);
 +            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
 +                return 0;
 +
 +            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
 +
 +            rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
 +
 +            return replayHandlers.size();
 +        }
 +
 +        public void finish(Set<InetAddress> hintedNodes)
 +        {
 +            for (int i = 0; i < replayHandlers.size(); i++)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 +                try
 +                {
 +                    handler.get();
 +                }
 +                catch (WriteTimeoutException|WriteFailureException e)
 +                {
-                     logger.debug("Failed replaying a batched mutation to a node, will write a hint");
-                     logger.debug("Failure was : {}", e.getMessage());
++                    logger.trace("Failed replaying a batched mutation to a node, will write a hint");
++                    logger.trace("Failure was : {}", e.getMessage());
 +                    // writing hints for the rest to hints, starting from i
 +                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
 +                    return;
 +                }
 +            }
 +        }
 +
 +        private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
 +        {
 +            int ret = 0;
 +            for (ByteBuffer serializedMutation : serializedMutations)
 +            {
 +                ret += serializedMutation.remaining();
 +                try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
 +                {
 +                    addMutation(Mutation.serializer.deserialize(in, version));
 +                }
 +            }
 +
 +            return ret;
 +        }
 +
 +        // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
 +        // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
 +        // truncated.
 +        private void addMutation(Mutation mutation)
 +        {
 +            for (UUID cfId : mutation.getColumnFamilyIds())
 +                if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
 +                    mutation = mutation.without(cfId);
 +
 +            if (!mutation.isEmpty())
 +                mutations.add(mutation);
 +        }
 +
 +        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
 +        {
 +            int gcgs = gcgs(mutations);
 +
 +            // expired
 +            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
 +                return;
 +
 +            for (int i = startFrom; i < replayHandlers.size(); i++)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 +                Mutation undeliveredMutation = mutations.get(i);
 +
 +                if (handler != null)
 +                {
 +                    hintedNodes.addAll(handler.undelivered);
 +                    HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
 +                                                Hint.create(undeliveredMutation, writtenAt));
 +                }
 +            }
 +        }
 +
 +        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
 +                                                                              long writtenAt,
 +                                                                              Set<InetAddress> hintedNodes)
 +        {
 +            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
 +            for (Mutation mutation : mutations)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
 +                if (handler != null)
 +                    handlers.add(handler);
 +            }
 +            return handlers;
 +        }
 +
 +        /**
 +         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
 +         * when a replica is down or a write request times out.
 +         *
 +         * @return direct delivery handler to wait on or null, if no live nodes found
 +         */
 +        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
 +                                                                                     long writtenAt,
 +                                                                                     Set<InetAddress> hintedNodes)
 +        {
 +            Set<InetAddress> liveEndpoints = new HashSet<>();
 +            String ks = mutation.getKeyspaceName();
 +            Token tk = mutation.key().getToken();
 +
 +            for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
 +                                                         StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
 +            {
 +                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
 +                {
 +                    mutation.apply();
 +                }
 +                else if (FailureDetector.instance.isAlive(endpoint))
 +                {
 +                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
 +                }
 +                else
 +                {
 +                    hintedNodes.add(endpoint);
 +                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
 +                                                Hint.create(mutation, writtenAt));
 +                }
 +            }
 +
 +            if (liveEndpoints.isEmpty())
 +                return null;
 +
 +            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
 +            MessageOut<Mutation> message = mutation.createMessage();
 +            for (InetAddress endpoint : liveEndpoints)
 +                MessagingService.instance().sendRR(message, endpoint, handler, false);
 +            return handler;
 +        }
 +
 +        private static int gcgs(Collection<Mutation> mutations)
 +        {
 +            int gcgs = Integer.MAX_VALUE;
 +            for (Mutation mutation : mutations)
 +                gcgs = Math.min(gcgs, mutation.smallestGCGS());
 +            return gcgs;
 +        }
 +
 +        /**
 +         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
 +         * which we did not receive a successful reply.
 +         */
 +        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
 +        {
 +            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
 +
 +            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
 +            {
 +                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
 +                undelivered.addAll(writeEndpoints);
 +            }
 +
 +            @Override
 +            protected int totalBlockFor()
 +            {
 +                return this.naturalEndpoints.size();
 +            }
 +
 +            @Override
 +            public void response(MessageIn<T> m)
 +            {
 +                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
 +                assert removed;
 +                super.response(m);
 +            }
 +        }
 +    }
 +
 +    public static class EndpointFilter
 +    {
 +        private final String localRack;
 +        private final Multimap<String, InetAddress> endpoints;
 +
 +        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
 +        {
 +            this.localRack = localRack;
 +            this.endpoints = endpoints;
 +        }
 +
 +        /**
 +         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
 +         */
 +        public Collection<InetAddress> filter()
 +        {
 +            // special case for single-node data centers
 +            if (endpoints.values().size() == 1)
 +                return endpoints.values();
 +
 +            // strip out dead endpoints and localhost
 +            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
 +            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
 +                if (isValid(entry.getValue()))
 +                    validated.put(entry.getKey(), entry.getValue());
 +
 +            if (validated.size() <= 2)
 +                return validated.values();
 +
 +            if (validated.size() - validated.get(localRack).size() >= 2)
 +            {
 +                // we have enough endpoints in other racks
 +                validated.removeAll(localRack);
 +            }
 +
 +            if (validated.keySet().size() == 1)
 +            {
 +                // we have only 1 `other` rack
 +                Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
 +                return Lists.newArrayList(Iterables.limit(otherRack, 2));
 +            }
 +
 +            // randomize which racks we pick from if more than 2 remaining
 +            Collection<String> racks;
 +            if (validated.keySet().size() == 2)
 +            {
 +                racks = validated.keySet();
 +            }
 +            else
 +            {
 +                racks = Lists.newArrayList(validated.keySet());
 +                Collections.shuffle((List<String>) racks);
 +            }
 +
 +            // grab a random member of up to two racks
 +            List<InetAddress> result = new ArrayList<>(2);
 +            for (String rack : Iterables.limit(racks, 2))
 +            {
 +                List<InetAddress> rackMembers = validated.get(rack);
 +                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
 +            }
 +
 +            return result;
 +        }
 +
 +        @VisibleForTesting
 +        protected boolean isValid(InetAddress input)
 +        {
 +            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
 +        }
 +
 +        @VisibleForTesting
 +        protected int getRandomInt(int bound)
 +        {
 +            return ThreadLocalRandom.current().nextInt(bound);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
index 13ff81a,0000000..dd19f19
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@@ -1,196 -1,0 +1,196 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.batchlog;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.UUIDType;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.exceptions.WriteFailureException;
 +import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.AbstractWriteResponseHandler;
 +import org.apache.cassandra.service.WriteResponseHandler;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +public final class LegacyBatchlogMigrator
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class);
 +
 +    private LegacyBatchlogMigrator()
 +    {
 +        // static class
 +    }
 +
 +    @SuppressWarnings("deprecation")
 +    public static void migrate()
 +    {
 +        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG);
 +
 +        // nothing to migrate
 +        if (store.isEmpty())
 +            return;
 +
 +        logger.info("Migrating legacy batchlog to new storage");
 +
 +        int convertedBatches = 0;
 +        String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.LEGACY_BATCHLOG);
 +
 +        int pageSize = BatchlogManager.calculatePageSize(store);
 +
 +        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize);
 +        for (UntypedResultSet.Row row : rows)
 +        {
 +            if (apply(row, convertedBatches))
 +                convertedBatches++;
 +        }
 +
 +        if (convertedBatches > 0)
 +            Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
 +    }
 +
 +    @SuppressWarnings("deprecation")
 +    public static boolean isLegacyBatchlogMutation(Mutation mutation)
 +    {
 +        return mutation.getKeyspaceName().equals(SystemKeyspace.NAME)
 +            && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null;
 +    }
 +
 +    @SuppressWarnings("deprecation")
 +    public static void handleLegacyMutation(Mutation mutation)
 +    {
 +        PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId);
-         logger.debug("Applying legacy batchlog mutation {}", update);
++        logger.trace("Applying legacy batchlog mutation {}", update);
 +        update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1));
 +    }
 +
 +    private static boolean apply(UntypedResultSet.Row row, long counter)
 +    {
 +        UUID id = row.getUUID("id");
 +        long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at");
 +        int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
 +
 +        if (id.version() != 1)
 +            id = UUIDGen.getTimeUUID(timestamp, counter);
 +
-         logger.debug("Converting mutation at {}", timestamp);
++        logger.trace("Converting mutation at {}", timestamp);
 +
 +        try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false))
 +        {
 +            int numMutations = in.readInt();
 +            List<Mutation> mutations = new ArrayList<>(numMutations);
 +            for (int i = 0; i < numMutations; i++)
 +                mutations.add(Mutation.serializer.deserialize(in, version));
 +
 +            BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations));
 +            return true;
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t);
 +            return false;
 +        }
 +    }
 +
 +    public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
 +    throws WriteTimeoutException, WriteFailureException
 +    {
 +        for (InetAddress target : endpoints)
 +        {
-             logger.debug("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
++            logger.trace("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
 +
 +            int targetVersion = MessagingService.instance().getVersion(target);
 +            MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION),
 +                                               target,
 +                                               handler,
 +                                               false);
 +        }
 +    }
 +
 +    public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
 +    {
 +        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
 +                                                                                     Collections.<InetAddress>emptyList(),
 +                                                                                     ConsistencyLevel.ANY,
 +                                                                                     Keyspace.open(SystemKeyspace.NAME),
 +                                                                                     null,
 +                                                                                     WriteType.SIMPLE);
 +        Mutation mutation = getRemoveMutation(uuid);
 +
 +        for (InetAddress target : endpoints)
 +        {
-             logger.debug("Sending legacy batchlog remove request {} to {}", uuid, target);
++            logger.trace("Sending legacy batchlog remove request {} to {}", uuid, target);
 +            MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false);
 +        }
 +    }
 +
 +    static void store(Batch batch, int version)
 +    {
 +        getStoreMutation(batch, version).apply();
 +    }
 +
 +    @SuppressWarnings("deprecation")
 +    static Mutation getStoreMutation(Batch batch, int version)
 +    {
 +        return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id)
 +               .clustering()
 +               .add("written_at", new Date(batch.creationTime / 1000))
 +               .add("data", getSerializedMutations(version, batch.decodedMutations))
 +               .add("version", version)
 +               .build();
 +    }
 +
 +    @SuppressWarnings("deprecation")
 +    private static Mutation getRemoveMutation(UUID uuid)
 +    {
 +        return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog,
 +                                                                UUIDType.instance.decompose(uuid),
 +                                                                FBUtilities.timestampMicros(),
 +                                                                FBUtilities.nowInSeconds()));
 +    }
 +
 +    private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations)
 +    {
 +        try (DataOutputBuffer buf = new DataOutputBuffer())
 +        {
 +            buf.writeInt(mutations.size());
 +            for (Mutation mutation : mutations)
 +                Mutation.serializer.serialize(mutation, buf, version);
 +            return buf.buffer();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index 2aafeb9,0000000..1db13e3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@@ -1,628 -1,0 +1,628 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3.functions;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.lang.invoke.MethodHandle;
 +import java.lang.invoke.MethodHandles;
 +import java.lang.invoke.MethodType;
 +import java.lang.reflect.InvocationTargetException;
 +import java.net.*;
 +import java.nio.ByteBuffer;
 +import java.security.*;
 +import java.security.cert.Certificate;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.io.ByteStreams;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.datastax.driver.core.DataType;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.eclipse.jdt.core.compiler.IProblem;
 +import org.eclipse.jdt.internal.compiler.*;
 +import org.eclipse.jdt.internal.compiler.Compiler;
 +import org.eclipse.jdt.internal.compiler.classfmt.ClassFileReader;
 +import org.eclipse.jdt.internal.compiler.classfmt.ClassFormatException;
 +import org.eclipse.jdt.internal.compiler.env.ICompilationUnit;
 +import org.eclipse.jdt.internal.compiler.env.INameEnvironment;
 +import org.eclipse.jdt.internal.compiler.env.NameEnvironmentAnswer;
 +import org.eclipse.jdt.internal.compiler.impl.CompilerOptions;
 +import org.eclipse.jdt.internal.compiler.problem.DefaultProblemFactory;
 +
 +final class JavaBasedUDFunction extends UDFunction
 +{
 +    private static final String BASE_PACKAGE = "org.apache.cassandra.cql3.udf.gen";
 +
 +    static final Logger logger = LoggerFactory.getLogger(JavaBasedUDFunction.class);
 +
 +    private static final AtomicInteger classSequence = new AtomicInteger();
 +
 +    // use a JVM standard ExecutorService as DebuggableThreadPoolExecutor references internal
 +    // classes, which triggers AccessControlException from the UDF sandbox
 +    private static final UDFExecutorService executor =
 +        new UDFExecutorService(new NamedThreadFactory("UserDefinedFunctions",
 +                                                      Thread.MIN_PRIORITY,
 +                                                      udfClassLoader,
 +                                                      new SecurityThreadGroup("UserDefinedFunctions", null)),
 +                               "userfunction");
 +
 +    private static final EcjTargetClassLoader targetClassLoader = new EcjTargetClassLoader();
 +
 +    private static final UDFByteCodeVerifier udfByteCodeVerifier = new UDFByteCodeVerifier();
 +
 +    private static final ProtectionDomain protectionDomain;
 +
 +    private static final IErrorHandlingPolicy errorHandlingPolicy = DefaultErrorHandlingPolicies.proceedWithAllProblems();
 +    private static final IProblemFactory problemFactory = new DefaultProblemFactory(Locale.ENGLISH);
 +    private static final CompilerOptions compilerOptions;
 +
 +    /**
 +     * Poor man's template - just a text file splitted at '#' chars.
 +     * Each string at an even index is a constant string (just copied),
 +     * each string at an odd index is an 'instruction'.
 +     */
 +    private static final String[] javaSourceTemplate;
 +
 +    static
 +    {
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "forName");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getClassLoader");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResource");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResourceAsStream");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "clearAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResource");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResourceAsStream");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResources");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemClassLoader");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResource");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResourceAsStream");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResources");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "loadClass");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setClassAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setDefaultAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setPackageAssertionStatus");
 +        udfByteCodeVerifier.addDisallowedMethodCall("java/nio/ByteBuffer", "allocateDirect");
 +
 +        Map<String, String> settings = new HashMap<>();
 +        settings.put(CompilerOptions.OPTION_LineNumberAttribute,
 +                     CompilerOptions.GENERATE);
 +        settings.put(CompilerOptions.OPTION_SourceFileAttribute,
 +                     CompilerOptions.DISABLED);
 +        settings.put(CompilerOptions.OPTION_ReportDeprecation,
 +                     CompilerOptions.IGNORE);
 +        settings.put(CompilerOptions.OPTION_Source,
 +                     CompilerOptions.VERSION_1_8);
 +        settings.put(CompilerOptions.OPTION_TargetPlatform,
 +                     CompilerOptions.VERSION_1_8);
 +
 +        compilerOptions = new CompilerOptions(settings);
 +        compilerOptions.parseLiteralExpressionsAsConstants = true;
 +
 +        try (InputStream input = JavaBasedUDFunction.class.getResource("JavaSourceUDF.txt").openConnection().getInputStream())
 +        {
 +            ByteArrayOutputStream output = new ByteArrayOutputStream();
 +            FBUtilities.copy(input, output, Long.MAX_VALUE);
 +            String template = output.toString();
 +
 +            StringTokenizer st = new StringTokenizer(template, "#");
 +            javaSourceTemplate = new String[st.countTokens()];
 +            for (int i = 0; st.hasMoreElements(); i++)
 +                javaSourceTemplate[i] = st.nextToken();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        CodeSource codeSource;
 +        try
 +        {
 +            codeSource = new CodeSource(new URL("udf", "localhost", 0, "/java", new URLStreamHandler()
 +            {
 +                protected URLConnection openConnection(URL u)
 +                {
 +                    return null;
 +                }
 +            }), (Certificate[])null);
 +        }
 +        catch (MalformedURLException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        protectionDomain = new ProtectionDomain(codeSource, ThreadAwareSecurityManager.noPermissions, targetClassLoader, null);
 +    }
 +
 +    private final JavaUDF javaUDF;
 +
 +    JavaBasedUDFunction(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,
 +                        AbstractType<?> returnType, boolean calledOnNullInput, String body)
 +    {
 +        super(name, argNames, argTypes, UDHelper.driverTypes(argTypes),
 +              returnType, UDHelper.driverType(returnType), calledOnNullInput, "java", body);
 +
 +        // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
 +        Class<?>[] javaParamTypes = UDHelper.javaTypes(argDataTypes, calledOnNullInput);
 +        // javaReturnType is just the Java representation for returnType resp. returnDataType
 +        Class<?> javaReturnType = UDHelper.asJavaClass(returnDataType);
 +
 +        // put each UDF in a separate package to prevent cross-UDF code access
 +        String pkgName = BASE_PACKAGE + '.' + generateClassName(name, 'p');
 +        String clsName = generateClassName(name, 'C');
 +
 +        String executeInternalName = generateClassName(name, 'x');
 +
 +        StringBuilder javaSourceBuilder = new StringBuilder();
 +        int lineOffset = 1;
 +        for (int i = 0; i < javaSourceTemplate.length; i++)
 +        {
 +            String s = javaSourceTemplate[i];
 +
 +            // strings at odd indexes are 'instructions'
 +            if ((i & 1) == 1)
 +            {
 +                switch (s)
 +                {
 +                    case "package_name":
 +                        s = pkgName;
 +                        break;
 +                    case "class_name":
 +                        s = clsName;
 +                        break;
 +                    case "body":
 +                        lineOffset = countNewlines(javaSourceBuilder);
 +                        s = body;
 +                        break;
 +                    case "arguments":
 +                        s = generateArguments(javaParamTypes, argNames);
 +                        break;
 +                    case "argument_list":
 +                        s = generateArgumentList(javaParamTypes, argNames);
 +                        break;
 +                    case "return_type":
 +                        s = javaSourceName(javaReturnType);
 +                        break;
 +                    case "execute_internal_name":
 +                        s = executeInternalName;
 +                        break;
 +                }
 +            }
 +
 +            javaSourceBuilder.append(s);
 +        }
 +
 +        String targetClassName = pkgName + '.' + clsName;
 +
 +        String javaSource = javaSourceBuilder.toString();
 +
-         logger.debug("Compiling Java source UDF '{}' as class '{}' using source:\n{}", name, targetClassName, javaSource);
++        logger.trace("Compiling Java source UDF '{}' as class '{}' using source:\n{}", name, targetClassName, javaSource);
 +
 +        try
 +        {
 +            EcjCompilationUnit compilationUnit = new EcjCompilationUnit(javaSource, targetClassName);
 +
 +            org.eclipse.jdt.internal.compiler.Compiler compiler = new Compiler(compilationUnit,
 +                                                                               errorHandlingPolicy,
 +                                                                               compilerOptions,
 +                                                                               compilationUnit,
 +                                                                               problemFactory);
 +            compiler.compile(new ICompilationUnit[]{ compilationUnit });
 +
 +            if (compilationUnit.problemList != null && !compilationUnit.problemList.isEmpty())
 +            {
 +                boolean fullSource = false;
 +                StringBuilder problems = new StringBuilder();
 +                for (IProblem problem : compilationUnit.problemList)
 +                {
 +                    long ln = problem.getSourceLineNumber() - lineOffset;
 +                    if (ln < 1L)
 +                    {
 +                        if (problem.isError())
 +                        {
 +                            // if generated source around UDF source provided by the user is buggy,
 +                            // this code is appended.
 +                            problems.append("GENERATED SOURCE ERROR: line ")
 +                                    .append(problem.getSourceLineNumber())
 +                                    .append(" (in generated source): ")
 +                                    .append(problem.getMessage())
 +                                    .append('\n');
 +                            fullSource = true;
 +                        }
 +                    }
 +                    else
 +                    {
 +                        problems.append("Line ")
 +                                .append(Long.toString(ln))
 +                                .append(": ")
 +                                .append(problem.getMessage())
 +                                .append('\n');
 +                    }
 +                }
 +
 +                if (fullSource)
 +                    throw new InvalidRequestException("Java source compilation failed:\n" + problems + "\n generated source:\n" + javaSource);
 +                else
 +                    throw new InvalidRequestException("Java source compilation failed:\n" + problems);
 +            }
 +
 +            // Verify the UDF bytecode against use of probably dangerous code
 +            Set<String> errors = udfByteCodeVerifier.verify(targetClassLoader.classData(targetClassName));
 +            String validDeclare = "not allowed method declared: " + executeInternalName + '(';
 +            String validCall = "call to " + targetClassName.replace('.', '/') + '.' + executeInternalName + "()";
 +            for (Iterator<String> i = errors.iterator(); i.hasNext();)
 +            {
 +                String error = i.next();
 +                // we generate a random name of the private, internal execute method, which is detected by the byte-code verifier
 +                if (error.startsWith(validDeclare) || error.equals(validCall))
 +                {
 +                    i.remove();
 +                }
 +            }
 +            if (!errors.isEmpty())
 +                throw new InvalidRequestException("Java UDF validation failed: " + errors);
 +
 +            // Load the class and create a new instance of it
 +            Thread thread = Thread.currentThread();
 +            ClassLoader orig = thread.getContextClassLoader();
 +            try
 +            {
 +                thread.setContextClassLoader(UDFunction.udfClassLoader);
 +                // Execute UDF intiialization from UDF class loader
 +
 +                Class cls = Class.forName(targetClassName, false, targetClassLoader);
 +
 +                if (cls.getDeclaredMethods().length != 2 || cls.getDeclaredConstructors().length != 1)
 +                    throw new InvalidRequestException("Check your source to not define additional Java methods or constructors");
 +                MethodType methodType = MethodType.methodType(void.class)
 +                                                  .appendParameterTypes(DataType.class, DataType[].class);
 +                MethodHandle ctor = MethodHandles.lookup().findConstructor(cls, methodType);
 +                this.javaUDF = (JavaUDF) ctor.invokeWithArguments(returnDataType, argDataTypes);
 +            }
 +            finally
 +            {
 +                thread.setContextClassLoader(orig);
 +            }
 +        }
 +        catch (InvocationTargetException e)
 +        {
 +            // in case of an ITE, use the cause
 +            throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e.getCause()));
 +        }
 +        catch (VirtualMachineError e)
 +        {
 +            throw e;
 +        }
 +        catch (Throwable e)
 +        {
 +            throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e));
 +        }
 +    }
 +
 +    protected ExecutorService executor()
 +    {
 +        return executor;
 +    }
 +
 +    protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params)
 +    {
 +        return javaUDF.executeImpl(protocolVersion, params);
 +    }
 +
 +
 +    private static int countNewlines(StringBuilder javaSource)
 +    {
 +        int ln = 0;
 +        for (int i = 0; i < javaSource.length(); i++)
 +            if (javaSource.charAt(i) == '\n')
 +                ln++;
 +        return ln;
 +    }
 +
 +    private static String generateClassName(FunctionName name, char prefix)
 +    {
 +        String qualifiedName = name.toString();
 +
 +        StringBuilder sb = new StringBuilder(qualifiedName.length() + 10);
 +        sb.append(prefix);
 +        for (int i = 0; i < qualifiedName.length(); i++)
 +        {
 +            char c = qualifiedName.charAt(i);
 +            if (Character.isJavaIdentifierPart(c))
 +                sb.append(c);
 +            else
 +                sb.append(Integer.toHexString(((short)c)&0xffff));
 +        }
 +        sb.append('_')
 +          .append(ThreadLocalRandom.current().nextInt() & 0xffffff)
 +          .append('_')
 +          .append(classSequence.incrementAndGet());
 +        return sb.toString();
 +    }
 +
 +    private static String javaSourceName(Class<?> type)
 +    {
 +        String n = type.getName();
 +        return n.startsWith("java.lang.") ? type.getSimpleName() : n;
 +    }
 +
 +    private static String generateArgumentList(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
 +    {
 +        // initial builder size can just be a guess (prevent temp object allocations)
 +        StringBuilder code = new StringBuilder(32 * paramTypes.length);
 +        for (int i = 0; i < paramTypes.length; i++)
 +        {
 +            if (i > 0)
 +                code.append(", ");
 +            code.append(javaSourceName(paramTypes[i]))
 +                .append(' ')
 +                .append(argNames.get(i));
 +        }
 +        return code.toString();
 +    }
 +
 +    private static String generateArguments(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
 +    {
 +        StringBuilder code = new StringBuilder(64 * paramTypes.length);
 +        for (int i = 0; i < paramTypes.length; i++)
 +        {
 +            if (i > 0)
 +                code.append(",\n");
 +
-             if (logger.isDebugEnabled())
++            if (logger.isTraceEnabled())
 +                code.append("            /* parameter '").append(argNames.get(i)).append("' */\n");
 +
 +            code
 +                // cast to Java type
 +                .append("            (").append(javaSourceName(paramTypes[i])).append(") ")
 +                // generate object representation of input parameter (call UDFunction.compose)
 +                .append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(i).append("))");
 +        }
 +        return code.toString();
 +    }
 +
 +    private static String composeMethod(Class<?> type)
 +    {
 +        return (type.isPrimitive()) ? ("super.compose_" + type.getName()) : "super.compose";
 +    }
 +
 +    // Java source UDFs are a very simple compilation task, which allows us to let one class implement
 +    // all interfaces required by ECJ.
 +    static final class EcjCompilationUnit implements ICompilationUnit, ICompilerRequestor, INameEnvironment
 +    {
 +        List<IProblem> problemList;
 +        private final String className;
 +        private final char[] sourceCode;
 +
 +        EcjCompilationUnit(String sourceCode, String className)
 +        {
 +            this.className = className;
 +            this.sourceCode = sourceCode.toCharArray();
 +        }
 +
 +        // ICompilationUnit
 +
 +        @Override
 +        public char[] getFileName()
 +        {
 +            return sourceCode;
 +        }
 +
 +        @Override
 +        public char[] getContents()
 +        {
 +            return sourceCode;
 +        }
 +
 +        @Override
 +        public char[] getMainTypeName()
 +        {
 +            int dot = className.lastIndexOf('.');
 +            return ((dot > 0) ? className.substring(dot + 1) : className).toCharArray();
 +        }
 +
 +        @Override
 +        public char[][] getPackageName()
 +        {
 +            StringTokenizer izer = new StringTokenizer(className, ".");
 +            char[][] result = new char[izer.countTokens() - 1][];
 +            for (int i = 0; i < result.length; i++)
 +                result[i] = izer.nextToken().toCharArray();
 +            return result;
 +        }
 +
 +        @Override
 +        public boolean ignoreOptionalProblems()
 +        {
 +            return false;
 +        }
 +
 +        // ICompilerRequestor
 +
 +        @Override
 +        public void acceptResult(CompilationResult result)
 +        {
 +            if (result.hasErrors())
 +            {
 +                IProblem[] problems = result.getProblems();
 +                if (problemList == null)
 +                    problemList = new ArrayList<>(problems.length);
 +                Collections.addAll(problemList, problems);
 +            }
 +            else
 +            {
 +                ClassFile[] classFiles = result.getClassFiles();
 +                for (ClassFile classFile : classFiles)
 +                    targetClassLoader.addClass(className, classFile.getBytes());
 +            }
 +        }
 +
 +        // INameEnvironment
 +
 +        @Override
 +        public NameEnvironmentAnswer findType(char[][] compoundTypeName)
 +        {
 +            StringBuilder result = new StringBuilder();
 +            for (int i = 0; i < compoundTypeName.length; i++)
 +            {
 +                if (i > 0)
 +                    result.append('.');
 +                result.append(compoundTypeName[i]);
 +            }
 +            return findType(result.toString());
 +        }
 +
 +        @Override
 +        public NameEnvironmentAnswer findType(char[] typeName, char[][] packageName)
 +        {
 +            StringBuilder result = new StringBuilder();
 +            int i = 0;
 +            for (; i < packageName.length; i++)
 +            {
 +                if (i > 0)
 +                    result.append('.');
 +                result.append(packageName[i]);
 +            }
 +            if (i > 0)
 +                result.append('.');
 +            result.append(typeName);
 +            return findType(result.toString());
 +        }
 +
 +        private NameEnvironmentAnswer findType(String className)
 +        {
 +            if (className.equals(this.className))
 +            {
 +                return new NameEnvironmentAnswer(this, null);
 +            }
 +
 +            String resourceName = className.replace('.', '/') + ".class";
 +
 +            try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
 +            {
 +                if (is != null)
 +                {
 +                    byte[] classBytes = ByteStreams.toByteArray(is);
 +                    char[] fileName = className.toCharArray();
 +                    ClassFileReader classFileReader = new ClassFileReader(classBytes, fileName, true);
 +                    return new NameEnvironmentAnswer(classFileReader, null);
 +                }
 +            }
 +            catch (IOException | ClassFormatException exc)
 +            {
 +                throw new RuntimeException(exc);
 +            }
 +            return null;
 +        }
 +
 +        private boolean isPackage(String result)
 +        {
 +            if (result.equals(this.className))
 +                return false;
 +            String resourceName = result.replace('.', '/') + ".class";
 +            try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
 +            {
 +                return is == null;
 +            }
 +            catch (IOException e)
 +            {
 +                // we are here, since close on is failed. That means it was not null
 +                return false;
 +            }
 +        }
 +
 +        @Override
 +        public boolean isPackage(char[][] parentPackageName, char[] packageName)
 +        {
 +            StringBuilder result = new StringBuilder();
 +            int i = 0;
 +            if (parentPackageName != null)
 +                for (; i < parentPackageName.length; i++)
 +                {
 +                    if (i > 0)
 +                        result.append('.');
 +                    result.append(parentPackageName[i]);
 +                }
 +
 +            if (Character.isUpperCase(packageName[0]) && !isPackage(result.toString()))
 +                return false;
 +            if (i > 0)
 +                result.append('.');
 +            result.append(packageName);
 +
 +            return isPackage(result.toString());
 +        }
 +
 +        @Override
 +        public void cleanup()
 +        {
 +        }
 +    }
 +
 +    static final class EcjTargetClassLoader extends SecureClassLoader
 +    {
 +        EcjTargetClassLoader()
 +        {
 +            super(UDFunction.udfClassLoader);
 +        }
 +
 +        // This map is usually empty.
 +        // It only contains data *during* UDF compilation but not during runtime.
 +        //
 +        // addClass() is invoked by ECJ after successful compilation of the generated Java source.
 +        // loadClass(targetClassName) is invoked by buildUDF() after ECJ returned from successful compilation.
 +        //
 +        private final Map<String, byte[]> classes = new ConcurrentHashMap<>();
 +
 +        void addClass(String className, byte[] classData)
 +        {
 +            classes.put(className, classData);
 +        }
 +
 +        byte[] classData(String className)
 +        {
 +            return classes.get(className);
 +        }
 +
 +        protected Class<?> findClass(String name) throws ClassNotFoundException
 +        {
 +            // remove the class binary - it's only used once - so it's wasting heap
 +            byte[] classData = classes.remove(name);
 +
 +            if (classData != null)
 +                return defineClass(name, classData, 0, classData.length, protectionDomain);
 +
 +            return getParent().loadClass(name);
 +        }
 +
 +        protected PermissionCollection getPermissions(CodeSource codesource)
 +        {
 +            return ThreadAwareSecurityManager.noPermissions;
 +        }
 +    }}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index e21d8af,1e5cea6..a07852d
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@@ -263,160 -143,11 +263,160 @@@ public abstract class UDFunction extend
              return null;
  
          long tStart = System.nanoTime();
 -        ByteBuffer result = executeUserDefined(protocolVersion, parameters);
 -        Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
 -        return result;
 +        parameters = makeEmptyParametersNull(parameters);
 +
 +        try
 +        {
 +            // Using async UDF execution is expensive (adds about 100us overhead per invocation on a Core-i7 MBPr).
 +            ByteBuffer result = DatabaseDescriptor.enableUserDefinedFunctionsThreads()
 +                                ? executeAsync(protocolVersion, parameters)
 +                                : executeUserDefined(protocolVersion, parameters);
 +
 +            Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
 +            return result;
 +        }
 +        catch (InvalidRequestException e)
 +        {
 +            throw e;
 +        }
 +        catch (Throwable t)
 +        {
-             logger.debug("Invocation of user-defined function '{}' failed", this, t);
++            logger.trace("Invocation of user-defined function '{}' failed", this, t);
 +            if (t instanceof VirtualMachineError)
 +                throw (VirtualMachineError) t;
 +            throw FunctionExecutionException.create(this, t);
 +        }
 +    }
 +
 +    public static void assertUdfsEnabled(String language)
 +    {
 +        if (!DatabaseDescriptor.enableUserDefinedFunctions())
 +            throw new InvalidRequestException("User-defined functions are disabled in cassandra.yaml - set enable_user_defined_functions=true to enable");
 +        if (!"java".equalsIgnoreCase(language) && !DatabaseDescriptor.enableScriptedUserDefinedFunctions())
 +            throw new InvalidRequestException("Scripted user-defined functions are disabled in cassandra.yaml - set enable_scripted_user_defined_functions=true to enable if you are aware of the security risks");
 +    }
 +
 +    private static final class ThreadIdAndCpuTime extends CompletableFuture<Object>
 +    {
 +        long threadId;
 +        long cpuTime;
 +
 +        ThreadIdAndCpuTime()
 +        {
 +            // Looks weird?
 +            // This call "just" links this class to java.lang.management - otherwise UDFs (script UDFs) might fail due to
 +            //      java.security.AccessControlException: access denied: ("java.lang.RuntimePermission" "accessClassInPackage.java.lang.management")
 +            // because class loading would be deferred until setup() is executed - but setup() is called with
 +            // limited privileges.
 +            threadMXBean.getCurrentThreadCpuTime();
 +            //
 +            // Get the TypeCodec stuff in Java Driver initialized.
 +            UDHelper.codecRegistry.codecFor(DataType.inet()).format(InetAddress.getLoopbackAddress());
 +            UDHelper.codecRegistry.codecFor(DataType.ascii()).format("");
 +        }
 +
 +        void setup()
 +        {
 +            this.threadId = Thread.currentThread().getId();
 +            this.cpuTime = threadMXBean.getCurrentThreadCpuTime();
 +            complete(null);
 +        }
 +    }
 +
 +    private ByteBuffer executeAsync(int protocolVersion, List<ByteBuffer> parameters)
 +    {
 +        ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
 +
 +        Future<ByteBuffer> future = executor().submit(() -> {
 +            threadIdAndCpuTime.setup();
 +            return executeUserDefined(protocolVersion, parameters);
 +        });
 +
 +        try
 +        {
 +            if (DatabaseDescriptor.getUserDefinedFunctionWarnTimeout() > 0)
 +                try
 +                {
 +                    return future.get(DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
 +                }
 +                catch (TimeoutException e)
 +                {
 +
 +                    // log and emit a warning that UDF execution took long
 +                    String warn = String.format("User defined function %s ran longer than %dms", this, DatabaseDescriptor.getUserDefinedFunctionWarnTimeout());
 +                    logger.warn(warn);
 +                    ClientWarn.warn(warn);
 +                }
 +
 +            // retry with difference of warn-timeout to fail-timeout
 +            return future.get(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            Thread.currentThread().interrupt();
 +            throw new RuntimeException(e);
 +        }
 +        catch (ExecutionException e)
 +        {
 +            Throwable c = e.getCause();
 +            if (c instanceof RuntimeException)
 +                throw (RuntimeException) c;
 +            throw new RuntimeException(c);
 +        }
 +        catch (TimeoutException e)
 +        {
 +            // retry a last time with the difference of UDF-fail-timeout to consumed CPU time (just in case execution hit a badly timed GC)
 +            try
 +            {
 +                //The threadIdAndCpuTime shouldn't take a long time to be set so this should return immediately
 +                threadIdAndCpuTime.get(1, TimeUnit.SECONDS);
 +
 +                long cpuTimeMillis = threadMXBean.getThreadCpuTime(threadIdAndCpuTime.threadId) - threadIdAndCpuTime.cpuTime;
 +                cpuTimeMillis /= 1000000L;
 +
 +                return future.get(Math.max(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - cpuTimeMillis, 0L),
 +                                  TimeUnit.MILLISECONDS);
 +            }
 +            catch (InterruptedException e1)
 +            {
 +                Thread.currentThread().interrupt();
 +                throw new RuntimeException(e);
 +            }
 +            catch (ExecutionException e1)
 +            {
 +                Throwable c = e.getCause();
 +                if (c instanceof RuntimeException)
 +                    throw (RuntimeException) c;
 +                throw new RuntimeException(c);
 +            }
 +            catch (TimeoutException e1)
 +            {
 +                TimeoutException cause = new TimeoutException(String.format("User defined function %s ran longer than %dms%s",
 +                                                                            this,
 +                                                                            DatabaseDescriptor.getUserDefinedFunctionFailTimeout(),
 +                                                                            DatabaseDescriptor.getUserFunctionTimeoutPolicy() == Config.UserFunctionTimeoutPolicy.ignore
 +                                                                            ? "" : " - will stop Cassandra VM"));
 +                FunctionExecutionException fe = FunctionExecutionException.create(this, cause);
 +                JVMStabilityInspector.userFunctionTimeout(cause);
 +                throw fe;
 +            }
 +        }
      }
  
 +    private List<ByteBuffer> makeEmptyParametersNull(List<ByteBuffer> parameters)
 +    {
 +        List<ByteBuffer> r = new ArrayList<>(parameters.size());
 +        for (int i = 0; i < parameters.size(); i++)
 +        {
 +            ByteBuffer param = parameters.get(i);
 +            r.add(UDHelper.isNullOrEmpty(argTypes.get(i), param)
 +                  ? null : param);
 +        }
 +        return r;
 +    }
 +
 +    protected abstract ExecutorService executor();
 +
      public boolean isCallableWrtNullable(List<ByteBuffer> parameters)
      {
          if (!calledOnNullInput)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 0735103,edc092d..d11d2c5
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@@ -220,22 -186,11 +220,22 @@@ public class CreateIndexStatement exten
          }
          else
          {
 -            cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
 +            indexOptions = Collections.emptyMap();
 +            kind = cfm.isCompound() ? IndexMetadata.Kind.COMPOSITES : IndexMetadata.Kind.KEYS;
          }
  
 -        cd.setIndexName(indexName);
 -        cfm.addDefaultIndexNames();
 +        IndexMetadata index = IndexMetadata.fromIndexTargets(cfm, targets, acceptedName, kind, indexOptions);
 +
 +        // check to disallow creation of an index which duplicates an existing one in all but name
 +        Optional<IndexMetadata> existingIndex = Iterables.tryFind(cfm.getIndexes(), existing -> existing.equalsWithoutName(index));
 +        if (existingIndex.isPresent())
 +            throw new InvalidRequestException(String.format("Index %s is a duplicate of existing index %s",
 +                                                            index.name,
 +                                                            existingIndex.get().name));
 +
-         logger.debug("Updating index definition for {}", indexName);
++        logger.trace("Updating index definition for {}", indexName);
 +        cfm.indexes(cfm.getIndexes().with(index));
 +
          MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly);
          return true;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a9a8f80,4b418b4..062eb0a
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -244,10 -214,10 +244,10 @@@ public class ColumnFamilyStore implemen
  
      void scheduleFlush()
      {
 -        int period = metadata.getMemtableFlushPeriod();
 +        int period = metadata.params.memtableFlushPeriodInMs;
          if (period > 0)
          {
-             logger.debug("scheduling flush in {} ms", period);
+             logger.trace("scheduling flush in {} ms", period);
              WrappedRunnable runnable = new WrappedRunnable()
              {
                  protected void runMayThrow() throws Exception
@@@ -422,7 -422,7 +422,7 @@@
              {
                  throw new RuntimeException(e);
              }
-             logger.debug("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry);
 -            logger.trace("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
++            logger.trace("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry);
              latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
              {
                  public void run()
@@@ -564,14 -543,36 +564,14 @@@
      {
          Directories directories = new Directories(metadata);
  
 -        // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
 +         // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
          clearEphemeralSnapshots(directories);
  
-         logger.debug("Removing temporary or obsoleted files from unfinished operations for table", metadata.cfName);
 -        // remove any left-behind SSTables from failed/stalled streaming
 -        FileFilter filter = new FileFilter()
 -        {
 -            public boolean accept(File pathname)
 -            {
 -                return pathname.getPath().endsWith(StreamLockfile.FILE_EXT);
 -            }
 -        };
 -        for (File dir : directories.getCFDirectories())
 -        {
 -            File[] lockfiles = dir.listFiles(filter);
 -            // lock files can be null if I/O error happens
 -            if (lockfiles == null || lockfiles.length == 0)
 -                continue;
 -            logger.info("Removing SSTables from failed streaming session. Found {} files to cleanup.", lockfiles.length);
 -
 -            for (File lockfile : lockfiles)
 -            {
 -                StreamLockfile streamLockfile = new StreamLockfile(lockfile);
 -                streamLockfile.cleanup();
 -                streamLockfile.delete();
 -            }
 -        }
 -
 -        logger.trace("Removing compacted SSTable files from {} (see http://wiki.apache.org/cassandra/MemtableSSTable)", metadata.cfName);
++        logger.trace("Removing temporary or obsoleted files from unfinished operations for table", metadata.cfName);
 +        LifecycleTransaction.removeUnfinishedLeftovers(metadata);
  
-         logger.debug("Further extra check for orphan sstable files for {}", metadata.cfName);
 -        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
++        logger.trace("Further extra check for orphan sstable files for {}", metadata.cfName);
 +        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet())
          {
              Descriptor desc = sstableFiles.getKey();
              Set<Component> components = sstableFiles.getValue();
@@@ -813,16 -904,19 +813,16 @@@
          onHeapTotal += memtable.getAllocator().onHeap().owns();
          offHeapTotal += memtable.getAllocator().offHeap().owns();
  
 -        for (SecondaryIndex index : indexManager.getIndexes())
 +        for (ColumnFamilyStore indexCfs : indexManager.getAllIndexColumnFamilyStores())
          {
 -            if (index.getIndexCfs() != null)
 -            {
 -                MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator();
 -                onHeapRatio += allocator.onHeap().ownershipRatio();
 -                offHeapRatio += allocator.offHeap().ownershipRatio();
 -                onHeapTotal += allocator.onHeap().owns();
 -                offHeapTotal += allocator.offHeap().owns();
 -            }
 +            MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
 +            onHeapRatio += allocator.onHeap().ownershipRatio();
 +            offHeapRatio += allocator.offHeap().ownershipRatio();
 +            onHeapTotal += allocator.onHeap().owns();
 +            offHeapTotal += allocator.offHeap().owns();
          }
  
-         logger.info("Enqueuing flush of {}: {}", name, String.format("%d (%.0f%%) on-heap, %d (%.0f%%) off-heap",
+         logger.debug("Enqueuing flush of {}: {}", name, String.format("%d (%.0f%%) on-heap, %d (%.0f%%) off-heap",
                                                                       onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100));
      }
  
@@@ -1152,9 -1341,9 +1152,9 @@@
       * @return sstables whose key range overlaps with that of the given sstables, not including itself.
       * (The given sstables may or may not overlap with each other.)
       */
 -    public Collection<SSTableReader> getOverlappingSSTables(Iterable<SSTableReader> sstables)
 +    public Collection<SSTableReader> getOverlappingSSTables(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
      {
-         logger.debug("Checking for sstables overlapping {}", sstables);
+         logger.trace("Checking for sstables overlapping {}", sstables);
  
          // a normal compaction won't ever have an empty sstables list, but we create a skeleton
          // compaction controller for streaming, and that passes an empty list.
@@@ -1636,10 -2409,10 +1636,10 @@@
                  SSTableReader sstable = active.get(entries.getKey().generation);
                  if (sstable == null || !refs.tryRef(sstable))
                  {
-                     if (logger.isDebugEnabled())
-                         logger.debug("using snapshot sstable {}", entries.getKey());
+                     if (logger.isTraceEnabled())
+                         logger.trace("using snapshot sstable {}", entries.getKey());
                      // open without tracking hotness
 -                    sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner, true, false);
 +                    sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, true, false);
                      refs.tryRef(sstable);
                      // release the self ref as we never add the snapshot sstable to DataTracker where it is otherwise released
                      sstable.selfRef().release();
@@@ -1854,9 -2634,9 +1854,9 @@@
          // beginning if we restart before they [the CL segments] are discarded for
          // normal reasons post-truncate.  To prevent this, we store truncation
          // position in the System keyspace.
-         logger.debug("truncating {}", name);
+         logger.trace("truncating {}", name);
  
 -        if (keyspace.getMetadata().durableWrites || DatabaseDescriptor.isAutoSnapshot())
 +        if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot())
          {
              // flush the CF being truncated before forcing the new segment
              forceBlockingFlush();
@@@ -1887,47 -2670,30 +1887,47 @@@
  
                  ReplayPosition replayAfter = discardSSTables(truncatedAt);
  
 -                for (SecondaryIndex index : indexManager.getIndexes())
 -                    index.truncateBlocking(truncatedAt);
 +                indexManager.truncateAllIndexesBlocking(truncatedAt);
 +
 +                viewManager.truncateBlocking(truncatedAt);
  
                  SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
-                 logger.debug("cleaning out row cache");
+                 logger.trace("cleaning out row cache");
                  invalidateCaches();
              }
          };
  
 -        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
 +        runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
-         logger.debug("truncate complete");
+         logger.trace("truncate complete");
      }
  
 -    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
 +    /**
 +     * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
 +     */
 +    public void dumpMemtable()
 +    {
 +        synchronized (data)
 +        {
 +            final Flush flush = new Flush(true);
 +            flushExecutor.execute(flush);
 +            postFlushExecutor.submit(flush.postFlush);
 +        }
 +    }
 +
 +    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
      {
          // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
          // and so we only run one major compaction at a time
          synchronized (this)
          {
-             logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
+             logger.trace("Cancelling in-progress compactions for {}", metadata.cfName);
  
 -            Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
 -            for (ColumnFamilyStore cfs : selfWithIndexes)
 -                cfs.getCompactionStrategy().pause();
 +            Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
 +                                                               ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
 +                                                               : concatWithIndexes();
 +
 +            for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
 +                cfs.getCompactionStrategyManager().pause();
              try
              {
                  // interrupt in-progress compactions