You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/22 22:13:19 UTC
[8/9] cassandra git commit: Merge branch cassandra-2.2 into
cassandra-3.0
Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa60cde3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa60cde3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa60cde3
Branch: refs/heads/trunk
Commit: aa60cde3122a2b512ba4283b2bfd2deaff008004
Parents: 96eb58a 4a849ef
Author: blerer <be...@datastax.com>
Authored: Tue Sep 22 22:04:30 2015 +0200
Committer: blerer <be...@datastax.com>
Committed: Tue Sep 22 22:10:30 2015 +0200
----------------------------------------------------------------------
NEWS.txt | 16 +++---
conf/logback.xml | 55 ++++++++++++++++---
.../cassandra/auth/CassandraAuthorizer.java | 2 +-
.../cassandra/auth/CassandraRoleManager.java | 4 +-
.../cassandra/auth/PasswordAuthenticator.java | 4 +-
.../apache/cassandra/auth/PermissionsCache.java | 2 +-
.../org/apache/cassandra/auth/RolesCache.java | 2 +-
.../cassandra/batchlog/BatchlogManager.java | 10 ++--
.../batchlog/LegacyBatchlogMigrator.java | 8 +--
.../apache/cassandra/cache/AutoSavingCache.java | 8 +--
.../cassandra/cache/SerializingCache.java | 2 +-
.../org/apache/cassandra/client/RingCache.java | 2 +-
.../DebuggableScheduledThreadPoolExecutor.java | 2 +-
.../DebuggableThreadPoolExecutor.java | 2 +-
.../apache/cassandra/cql3/QueryProcessor.java | 6 +--
.../cql3/functions/JavaBasedUDFunction.java | 4 +-
.../cassandra/cql3/functions/UDFunction.java | 2 +-
.../cql3/statements/CreateIndexStatement.java | 2 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 44 +++++++--------
.../apache/cassandra/db/ConsistencyLevel.java | 6 +--
.../db/CounterMutationVerbHandler.java | 2 +-
.../db/DefinitionsUpdateVerbHandler.java | 2 +-
.../org/apache/cassandra/db/Directories.java | 8 +--
src/java/org/apache/cassandra/db/Keyspace.java | 8 +--
src/java/org/apache/cassandra/db/Memtable.java | 16 +++---
.../db/MigrationRequestVerbHandler.java | 2 +-
.../cassandra/db/SchemaCheckVerbHandler.java | 2 +-
.../cassandra/db/SizeEstimatesRecorder.java | 6 +--
.../org/apache/cassandra/db/SystemKeyspace.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 6 +--
.../db/commitlog/CommitLogArchiver.java | 4 +-
.../db/commitlog/CommitLogReplayer.java | 22 ++++----
.../db/commitlog/CommitLogSegmentManager.java | 14 ++---
.../db/compaction/CompactionController.java | 4 +-
.../db/compaction/CompactionManager.java | 30 +++++------
.../compaction/CompactionStrategyManager.java | 2 +-
.../cassandra/db/compaction/CompactionTask.java | 8 +--
.../DateTieredCompactionStrategy.java | 10 ++--
.../compaction/LeveledCompactionStrategy.java | 4 +-
.../db/compaction/LeveledManifest.java | 32 +++++------
.../SizeTieredCompactionStrategy.java | 4 +-
.../SplittingSizeTieredCompactionWriter.java | 4 +-
.../db/lifecycle/LifecycleTransaction.java | 20 +++----
.../cassandra/db/lifecycle/LogTransaction.java | 16 +++---
.../apache/cassandra/db/lifecycle/Tracker.java | 8 +--
.../org/apache/cassandra/dht/BootStrapper.java | 4 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 12 ++---
.../cassandra/hadoop/cql3/CqlInputFormat.java | 4 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 6 +--
...mitedLocalNodeFirstLocalBalancingPolicy.java | 14 ++---
.../cassandra/hadoop/pig/CqlNativeStorage.java | 2 +-
.../apache/cassandra/hints/HintVerbHandler.java | 2 +-
.../cassandra/hints/HintsDispatchExecutor.java | 2 +-
.../cassandra/index/SecondaryIndexManager.java | 6 +--
.../index/internal/CassandraIndex.java | 6 +--
.../io/sstable/IndexSummaryManager.java | 10 ++--
.../apache/cassandra/io/sstable/SSTable.java | 2 +-
.../io/sstable/format/SSTableReader.java | 24 ++++-----
.../io/sstable/metadata/MetadataSerializer.java | 8 +--
.../org/apache/cassandra/io/util/FileUtils.java | 8 +--
.../locator/AbstractReplicationStrategy.java | 2 +-
.../locator/NetworkTopologyStrategy.java | 2 +-
.../cassandra/locator/PropertyFileSnitch.java | 6 +--
.../locator/ReconnectableSnitchHelper.java | 2 +-
.../apache/cassandra/locator/TokenMetadata.java | 8 +--
.../net/IncomingStreamingConnection.java | 4 +-
.../cassandra/net/IncomingTcpConnection.java | 10 ++--
.../cassandra/net/MessageDeliveryTask.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 20 +++----
.../cassandra/net/OutboundTcpConnection.java | 12 ++---
.../cassandra/net/ResponseVerbHandler.java | 2 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 2 +-
.../cassandra/service/DigestResolver.java | 8 +--
.../apache/cassandra/service/GCInspector.java | 4 +-
.../cassandra/service/LoadBroadcaster.java | 4 +-
.../apache/cassandra/service/ReadCallback.java | 4 +-
.../apache/cassandra/service/StorageProxy.java | 14 ++---
.../cassandra/thrift/CassandraServer.java | 56 ++++++++++----------
.../thrift/CustomTThreadPoolServer.java | 4 +-
.../cassandra/thrift/ThriftValidation.java | 4 +-
.../org/apache/cassandra/tracing/Tracing.java | 2 +-
.../org/apache/cassandra/transport/Message.java | 6 +--
.../cassandra/triggers/CustomClassLoader.java | 2 +-
.../org/apache/cassandra/utils/CLibrary.java | 2 +-
.../org/apache/cassandra/utils/Mx4jTool.java | 4 +-
.../apache/cassandra/utils/OutputHandler.java | 2 +-
.../org/apache/cassandra/utils/TopKSampler.java | 2 +-
87 files changed, 383 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 67398cf,6bd0a77..e4b8663
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -122,6 -27,16 +122,12 @@@ Changed Default
- commitlog_total_space_in_mb will use the smaller of 8192, and 1/4
of the total space of the commitlog volume. (Before: always used
8192)
- - Incremental repair is on by default since 2.2.0, run full repairs by
- providing the '-full' parameter to nodetool repair.
- - Parallel repairs are the default since 2.2.0, run sequential repairs
- by providing the '-seq' parameter to nodetool repair.
+ - The following INFO logs were reduced to DEBUG level and will now show
+ on debug.log instead of system.log:
+ - Memtable flushing actions
+ - Commit log replayed files
+ - Compacted sstables
+ - SStable opening (SSTableReader)
New features
------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 934ebaa,0000000..8bc4c26
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@@ -1,554 -1,0 +1,554 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static com.google.common.collect.Iterables.transform;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
+
+public class BatchlogManager implements BatchlogManagerMBean
+{
+ public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
+ private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
+ static final int DEFAULT_PAGE_SIZE = 128;
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
+ public static final BatchlogManager instance = new BatchlogManager();
+
+ private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
+ private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
+
+ // Single-thread executor service for scheduling and serializing log replay.
+ private final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
+
+ public void start()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
+ StorageService.RING_DELAY,
+ REPLAY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+ batchlogTasks.shutdown();
+ batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
+ public static void remove(UUID id)
+ {
+ new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
+ UUIDType.instance.decompose(id),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()))
+ .apply();
+ }
+
+ public static void store(Batch batch)
+ {
+ store(batch, true);
+ }
+
+ public static void store(Batch batch, boolean durableWrites)
+ {
+ RowUpdateBuilder builder =
+ new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
+ .clustering()
+ .add("version", MessagingService.current_version);
+
+ for (ByteBuffer mutation : batch.encodedMutations)
+ builder.addListEntry("mutations", mutation);
+
+ for (Mutation mutation : batch.decodedMutations)
+ {
+ try (DataOutputBuffer buffer = new DataOutputBuffer())
+ {
+ Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
+ builder.addListEntry("mutations", buffer.buffer());
+ }
+ catch (IOException e)
+ {
+ // shouldn't happen
+ throw new AssertionError(e);
+ }
+ }
+
+ builder.build().apply(durableWrites);
+ }
+
+ @VisibleForTesting
+ public int countAllBatches()
+ {
+ String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
+ UntypedResultSet results = executeInternal(query);
+ if (results == null || results.isEmpty())
+ return 0;
+
+ return (int) results.one().getLong("count");
+ }
+
+ public long getTotalBatchesReplayed()
+ {
+ return totalBatchesReplayed;
+ }
+
+ public void forceBatchlogReplay() throws Exception
+ {
+ startBatchlogReplay().get();
+ }
+
+ public Future<?> startBatchlogReplay()
+ {
+ // If a replay is already in progress this request will be executed after it completes.
+ return batchlogTasks.submit(this::replayFailedBatches);
+ }
+
+ void performInitialReplay() throws InterruptedException, ExecutionException
+ {
+ // Invokes initial replay. Used for testing only.
+ batchlogTasks.submit(this::replayFailedBatches).get();
+ }
+
+ private void replayFailedBatches()
+ {
- logger.debug("Started replayFailedBatches");
++ logger.trace("Started replayFailedBatches");
+
+ // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+ // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+ int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+ RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
+ UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
+ ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
+ int pageSize = calculatePageSize(store);
+ // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
+ // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
+ // token(id) > token(lastReplayedUuid) as part of the query.
+ String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
+ SystemKeyspace.NAME,
+ SystemKeyspace.BATCHES);
+ UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
+ processBatchlogEntries(batches, pageSize, rateLimiter);
+ lastReplayedUuid = limitUuid;
- logger.debug("Finished replayFailedBatches");
++ logger.trace("Finished replayFailedBatches");
+ }
+
+ // read less rows (batches) per page if they are very large
+ static int calculatePageSize(ColumnFamilyStore store)
+ {
+ double averageRowSize = store.getMeanPartitionSize();
+ if (averageRowSize <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
+ }
+
+ private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
+ {
+ int positionInPage = 0;
+ ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
+
+ Set<InetAddress> hintedNodes = new HashSet<>();
+ Set<UUID> replayedBatches = new HashSet<>();
+
+ // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
+ for (UntypedResultSet.Row row : batches)
+ {
+ UUID id = row.getUUID("id");
+ int version = row.getInt("version");
+ try
+ {
+ ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
+ if (batch.replay(rateLimiter, hintedNodes) > 0)
+ {
+ unfinishedBatches.add(batch);
+ }
+ else
+ {
+ remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
+ ++totalBatchesReplayed;
+ }
+ }
+ catch (IOException e)
+ {
+ logger.warn("Skipped batch replay of {} due to {}", id, e);
+ remove(id);
+ }
+
+ if (++positionInPage == pageSize)
+ {
+ // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
+ // finish processing the page before requesting the next row.
+ finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+ positionInPage = 0;
+ }
+ }
+
+ finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+
+ // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
+ HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
+
+ // once all generated hints are fsynced, actually delete the batches
+ replayedBatches.forEach(BatchlogManager::remove);
+ }
+
+ private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
+ {
+ // schedule hints for timed out deliveries
+ for (ReplayingBatch batch : batches)
+ {
+ batch.finish(hintedNodes);
+ replayedBatches.add(batch.id);
+ }
+
+ totalBatchesReplayed += batches.size();
+ batches.clear();
+ }
+
+ public static long getBatchlogTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
+ }
+
+ private static class ReplayingBatch
+ {
+ private final UUID id;
+ private final long writtenAt;
+ private final List<Mutation> mutations;
+ private final int replayedBytes;
+
+ private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
+
+ ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
+ {
+ this.id = id;
+ this.writtenAt = UUIDGen.unixTimestamp(id);
+ this.mutations = new ArrayList<>(serializedMutations.size());
+ this.replayedBytes = addMutations(version, serializedMutations);
+ }
+
+ public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
+ {
- logger.debug("Replaying batch {}", id);
++ logger.trace("Replaying batch {}", id);
+
+ if (mutations.isEmpty())
+ return 0;
+
+ int gcgs = gcgs(mutations);
+ if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+ return 0;
+
+ replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
+
+ rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
+
+ return replayHandlers.size();
+ }
+
+ public void finish(Set<InetAddress> hintedNodes)
+ {
+ for (int i = 0; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException|WriteFailureException e)
+ {
- logger.debug("Failed replaying a batched mutation to a node, will write a hint");
- logger.debug("Failure was : {}", e.getMessage());
++ logger.trace("Failed replaying a batched mutation to a node, will write a hint");
++ logger.trace("Failure was : {}", e.getMessage());
+ // writing hints for the rest to hints, starting from i
+ writeHintsForUndeliveredEndpoints(i, hintedNodes);
+ return;
+ }
+ }
+ }
+
+ private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
+ {
+ int ret = 0;
+ for (ByteBuffer serializedMutation : serializedMutations)
+ {
+ ret += serializedMutation.remaining();
+ try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
+ {
+ addMutation(Mutation.serializer.deserialize(in, version));
+ }
+ }
+
+ return ret;
+ }
+
+ // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
+ // truncated.
+ private void addMutation(Mutation mutation)
+ {
+ for (UUID cfId : mutation.getColumnFamilyIds())
+ if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
+
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+
+ private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
+ {
+ int gcgs = gcgs(mutations);
+
+ // expired
+ if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+ return;
+
+ for (int i = startFrom; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+ Mutation undeliveredMutation = mutations.get(i);
+
+ if (handler != null)
+ {
+ hintedNodes.addAll(handler.undelivered);
+ HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+ Hint.create(undeliveredMutation, writtenAt));
+ }
+ }
+ }
+
+ private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
+ long writtenAt,
+ Set<InetAddress> hintedNodes)
+ {
+ List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
+ for (Mutation mutation : mutations)
+ {
+ ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
+ if (handler != null)
+ handlers.add(handler);
+ }
+ return handlers;
+ }
+
+ /**
+ * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+ * when a replica is down or a write request times out.
+ *
+ * @return direct delivery handler to wait on or null, if no live nodes found
+ */
+ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
+ long writtenAt,
+ Set<InetAddress> hintedNodes)
+ {
+ Set<InetAddress> liveEndpoints = new HashSet<>();
+ String ks = mutation.getKeyspaceName();
+ Token tk = mutation.key().getToken();
+
+ for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ {
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ mutation.apply();
+ }
+ else if (FailureDetector.instance.isAlive(endpoint))
+ {
+ liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+ }
+ else
+ {
+ hintedNodes.add(endpoint);
+ HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+ Hint.create(mutation, writtenAt));
+ }
+ }
+
+ if (liveEndpoints.isEmpty())
+ return null;
+
+ ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
+ MessageOut<Mutation> message = mutation.createMessage();
+ for (InetAddress endpoint : liveEndpoints)
+ MessagingService.instance().sendRR(message, endpoint, handler, false);
+ return handler;
+ }
+
+ private static int gcgs(Collection<Mutation> mutations)
+ {
+ int gcgs = Integer.MAX_VALUE;
+ for (Mutation mutation : mutations)
+ gcgs = Math.min(gcgs, mutation.smallestGCGS());
+ return gcgs;
+ }
+
+ /**
+ * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
+ * which we did not receive a successful reply.
+ */
+ private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
+ {
+ private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+ {
+ super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+ undelivered.addAll(writeEndpoints);
+ }
+
+ @Override
+ protected int totalBlockFor()
+ {
+ return this.naturalEndpoints.size();
+ }
+
+ @Override
+ public void response(MessageIn<T> m)
+ {
+ boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
+ assert removed;
+ super.response(m);
+ }
+ }
+ }
+
+ public static class EndpointFilter
+ {
+ private final String localRack;
+ private final Multimap<String, InetAddress> endpoints;
+
+ public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+ {
+ this.localRack = localRack;
+ this.endpoints = endpoints;
+ }
+
+ /**
+ * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+ */
+ public Collection<InetAddress> filter()
+ {
+ // special case for single-node data centers
+ if (endpoints.values().size() == 1)
+ return endpoints.values();
+
+ // strip out dead endpoints and localhost
+ ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+ for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+ if (isValid(entry.getValue()))
+ validated.put(entry.getKey(), entry.getValue());
+
+ if (validated.size() <= 2)
+ return validated.values();
+
+ if (validated.size() - validated.get(localRack).size() >= 2)
+ {
+ // we have enough endpoints in other racks
+ validated.removeAll(localRack);
+ }
+
+ if (validated.keySet().size() == 1)
+ {
+ // we have only 1 `other` rack
+ Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+ return Lists.newArrayList(Iterables.limit(otherRack, 2));
+ }
+
+ // randomize which racks we pick from if more than 2 remaining
+ Collection<String> racks;
+ if (validated.keySet().size() == 2)
+ {
+ racks = validated.keySet();
+ }
+ else
+ {
+ racks = Lists.newArrayList(validated.keySet());
+ Collections.shuffle((List<String>) racks);
+ }
+
+ // grab a random member of up to two racks
+ List<InetAddress> result = new ArrayList<>(2);
+ for (String rack : Iterables.limit(racks, 2))
+ {
+ List<InetAddress> rackMembers = validated.get(rack);
+ result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+ }
+
+ return result;
+ }
+
+ @VisibleForTesting
+ protected boolean isValid(InetAddress input)
+ {
+ return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+ }
+
+ @VisibleForTesting
+ protected int getRandomInt(int bound)
+ {
+ return ThreadLocalRandom.current().nextInt(bound);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
index 13ff81a,0000000..dd19f19
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@@ -1,196 -1,0 +1,196 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class LegacyBatchlogMigrator
+{
+ private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class);
+
+ private LegacyBatchlogMigrator()
+ {
+ // static class
+ }
+
+ @SuppressWarnings("deprecation")
+ public static void migrate()
+ {
+ ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG);
+
+ // nothing to migrate
+ if (store.isEmpty())
+ return;
+
+ logger.info("Migrating legacy batchlog to new storage");
+
+ int convertedBatches = 0;
+ String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
+ SystemKeyspace.NAME,
+ SystemKeyspace.LEGACY_BATCHLOG);
+
+ int pageSize = BatchlogManager.calculatePageSize(store);
+
+ UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize);
+ for (UntypedResultSet.Row row : rows)
+ {
+ if (apply(row, convertedBatches))
+ convertedBatches++;
+ }
+
+ if (convertedBatches > 0)
+ Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
+ }
+
+ @SuppressWarnings("deprecation")
+ public static boolean isLegacyBatchlogMutation(Mutation mutation)
+ {
+ return mutation.getKeyspaceName().equals(SystemKeyspace.NAME)
+ && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null;
+ }
+
+ @SuppressWarnings("deprecation")
+ public static void handleLegacyMutation(Mutation mutation)
+ {
+ PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId);
- logger.debug("Applying legacy batchlog mutation {}", update);
++ logger.trace("Applying legacy batchlog mutation {}", update);
+ update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1));
+ }
+
+ private static boolean apply(UntypedResultSet.Row row, long counter)
+ {
+ UUID id = row.getUUID("id");
+ long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at");
+ int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+
+ if (id.version() != 1)
+ id = UUIDGen.getTimeUUID(timestamp, counter);
+
- logger.debug("Converting mutation at {}", timestamp);
++ logger.trace("Converting mutation at {}", timestamp);
+
+ try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false))
+ {
+ int numMutations = in.readInt();
+ List<Mutation> mutations = new ArrayList<>(numMutations);
+ for (int i = 0; i < numMutations; i++)
+ mutations.add(Mutation.serializer.deserialize(in, version));
+
+ BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations));
+ return true;
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t);
+ return false;
+ }
+ }
+
+ public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
+ throws WriteTimeoutException, WriteFailureException
+ {
+ for (InetAddress target : endpoints)
+ {
- logger.debug("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
++ logger.trace("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
+ int targetVersion = MessagingService.instance().getVersion(target);
+ MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION),
+ target,
+ handler,
+ false);
+ }
+ }
+
+ public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+ {
+ AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
+ Collections.<InetAddress>emptyList(),
+ ConsistencyLevel.ANY,
+ Keyspace.open(SystemKeyspace.NAME),
+ null,
+ WriteType.SIMPLE);
+ Mutation mutation = getRemoveMutation(uuid);
+
+ for (InetAddress target : endpoints)
+ {
- logger.debug("Sending legacy batchlog remove request {} to {}", uuid, target);
++ logger.trace("Sending legacy batchlog remove request {} to {}", uuid, target);
+ MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false);
+ }
+ }
+
+ static void store(Batch batch, int version)
+ {
+ getStoreMutation(batch, version).apply();
+ }
+
+ @SuppressWarnings("deprecation")
+ static Mutation getStoreMutation(Batch batch, int version)
+ {
+ return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id)
+ .clustering()
+ .add("written_at", new Date(batch.creationTime / 1000))
+ .add("data", getSerializedMutations(version, batch.decodedMutations))
+ .add("version", version)
+ .build();
+ }
+
+ @SuppressWarnings("deprecation")
+ private static Mutation getRemoveMutation(UUID uuid)
+ {
+ return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog,
+ UUIDType.instance.decompose(uuid),
+ FBUtilities.timestampMicros(),
+ FBUtilities.nowInSeconds()));
+ }
+
+ private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations)
+ {
+ try (DataOutputBuffer buf = new DataOutputBuffer())
+ {
+ buf.writeInt(mutations.size());
+ for (Mutation mutation : mutations)
+ Mutation.serializer.serialize(mutation, buf, version);
+ return buf.buffer();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cache/SerializingCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index 2aafeb9,0000000..1db13e3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@@ -1,628 -1,0 +1,628 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.functions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.lang.reflect.InvocationTargetException;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.security.*;
+import java.security.cert.Certificate;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.io.ByteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.DataType;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.eclipse.jdt.core.compiler.IProblem;
+import org.eclipse.jdt.internal.compiler.*;
+import org.eclipse.jdt.internal.compiler.Compiler;
+import org.eclipse.jdt.internal.compiler.classfmt.ClassFileReader;
+import org.eclipse.jdt.internal.compiler.classfmt.ClassFormatException;
+import org.eclipse.jdt.internal.compiler.env.ICompilationUnit;
+import org.eclipse.jdt.internal.compiler.env.INameEnvironment;
+import org.eclipse.jdt.internal.compiler.env.NameEnvironmentAnswer;
+import org.eclipse.jdt.internal.compiler.impl.CompilerOptions;
+import org.eclipse.jdt.internal.compiler.problem.DefaultProblemFactory;
+
+final class JavaBasedUDFunction extends UDFunction
+{
+ private static final String BASE_PACKAGE = "org.apache.cassandra.cql3.udf.gen";
+
+ static final Logger logger = LoggerFactory.getLogger(JavaBasedUDFunction.class);
+
+ private static final AtomicInteger classSequence = new AtomicInteger();
+
+ // use a JVM standard ExecutorService as DebuggableThreadPoolExecutor references internal
+ // classes, which triggers AccessControlException from the UDF sandbox
+ private static final UDFExecutorService executor =
+ new UDFExecutorService(new NamedThreadFactory("UserDefinedFunctions",
+ Thread.MIN_PRIORITY,
+ udfClassLoader,
+ new SecurityThreadGroup("UserDefinedFunctions", null)),
+ "userfunction");
+
+ private static final EcjTargetClassLoader targetClassLoader = new EcjTargetClassLoader();
+
+ private static final UDFByteCodeVerifier udfByteCodeVerifier = new UDFByteCodeVerifier();
+
+ private static final ProtectionDomain protectionDomain;
+
+ private static final IErrorHandlingPolicy errorHandlingPolicy = DefaultErrorHandlingPolicies.proceedWithAllProblems();
+ private static final IProblemFactory problemFactory = new DefaultProblemFactory(Locale.ENGLISH);
+ private static final CompilerOptions compilerOptions;
+
+ /**
+ * Poor man's template - just a text file splitted at '#' chars.
+ * Each string at an even index is a constant string (just copied),
+ * each string at an odd index is an 'instruction'.
+ */
+ private static final String[] javaSourceTemplate;
+
+ static
+ {
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "forName");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getClassLoader");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResource");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/Class", "getResourceAsStream");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "clearAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResource");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResourceAsStream");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getResources");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemClassLoader");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResource");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResourceAsStream");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "getSystemResources");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "loadClass");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setClassAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setDefaultAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/lang/ClassLoader", "setPackageAssertionStatus");
+ udfByteCodeVerifier.addDisallowedMethodCall("java/nio/ByteBuffer", "allocateDirect");
+
+ Map<String, String> settings = new HashMap<>();
+ settings.put(CompilerOptions.OPTION_LineNumberAttribute,
+ CompilerOptions.GENERATE);
+ settings.put(CompilerOptions.OPTION_SourceFileAttribute,
+ CompilerOptions.DISABLED);
+ settings.put(CompilerOptions.OPTION_ReportDeprecation,
+ CompilerOptions.IGNORE);
+ settings.put(CompilerOptions.OPTION_Source,
+ CompilerOptions.VERSION_1_8);
+ settings.put(CompilerOptions.OPTION_TargetPlatform,
+ CompilerOptions.VERSION_1_8);
+
+ compilerOptions = new CompilerOptions(settings);
+ compilerOptions.parseLiteralExpressionsAsConstants = true;
+
+ try (InputStream input = JavaBasedUDFunction.class.getResource("JavaSourceUDF.txt").openConnection().getInputStream())
+ {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ FBUtilities.copy(input, output, Long.MAX_VALUE);
+ String template = output.toString();
+
+ StringTokenizer st = new StringTokenizer(template, "#");
+ javaSourceTemplate = new String[st.countTokens()];
+ for (int i = 0; st.hasMoreElements(); i++)
+ javaSourceTemplate[i] = st.nextToken();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ CodeSource codeSource;
+ try
+ {
+ codeSource = new CodeSource(new URL("udf", "localhost", 0, "/java", new URLStreamHandler()
+ {
+ protected URLConnection openConnection(URL u)
+ {
+ return null;
+ }
+ }), (Certificate[])null);
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ protectionDomain = new ProtectionDomain(codeSource, ThreadAwareSecurityManager.noPermissions, targetClassLoader, null);
+ }
+
+ private final JavaUDF javaUDF;
+
+ JavaBasedUDFunction(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,
+ AbstractType<?> returnType, boolean calledOnNullInput, String body)
+ {
+ super(name, argNames, argTypes, UDHelper.driverTypes(argTypes),
+ returnType, UDHelper.driverType(returnType), calledOnNullInput, "java", body);
+
+ // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
+ Class<?>[] javaParamTypes = UDHelper.javaTypes(argDataTypes, calledOnNullInput);
+ // javaReturnType is just the Java representation for returnType resp. returnDataType
+ Class<?> javaReturnType = UDHelper.asJavaClass(returnDataType);
+
+ // put each UDF in a separate package to prevent cross-UDF code access
+ String pkgName = BASE_PACKAGE + '.' + generateClassName(name, 'p');
+ String clsName = generateClassName(name, 'C');
+
+ String executeInternalName = generateClassName(name, 'x');
+
+ StringBuilder javaSourceBuilder = new StringBuilder();
+ int lineOffset = 1;
+ for (int i = 0; i < javaSourceTemplate.length; i++)
+ {
+ String s = javaSourceTemplate[i];
+
+ // strings at odd indexes are 'instructions'
+ if ((i & 1) == 1)
+ {
+ switch (s)
+ {
+ case "package_name":
+ s = pkgName;
+ break;
+ case "class_name":
+ s = clsName;
+ break;
+ case "body":
+ lineOffset = countNewlines(javaSourceBuilder);
+ s = body;
+ break;
+ case "arguments":
+ s = generateArguments(javaParamTypes, argNames);
+ break;
+ case "argument_list":
+ s = generateArgumentList(javaParamTypes, argNames);
+ break;
+ case "return_type":
+ s = javaSourceName(javaReturnType);
+ break;
+ case "execute_internal_name":
+ s = executeInternalName;
+ break;
+ }
+ }
+
+ javaSourceBuilder.append(s);
+ }
+
+ String targetClassName = pkgName + '.' + clsName;
+
+ String javaSource = javaSourceBuilder.toString();
+
- logger.debug("Compiling Java source UDF '{}' as class '{}' using source:\n{}", name, targetClassName, javaSource);
++ logger.trace("Compiling Java source UDF '{}' as class '{}' using source:\n{}", name, targetClassName, javaSource);
+
+ try
+ {
+ EcjCompilationUnit compilationUnit = new EcjCompilationUnit(javaSource, targetClassName);
+
+ org.eclipse.jdt.internal.compiler.Compiler compiler = new Compiler(compilationUnit,
+ errorHandlingPolicy,
+ compilerOptions,
+ compilationUnit,
+ problemFactory);
+ compiler.compile(new ICompilationUnit[]{ compilationUnit });
+
+ if (compilationUnit.problemList != null && !compilationUnit.problemList.isEmpty())
+ {
+ boolean fullSource = false;
+ StringBuilder problems = new StringBuilder();
+ for (IProblem problem : compilationUnit.problemList)
+ {
+ long ln = problem.getSourceLineNumber() - lineOffset;
+ if (ln < 1L)
+ {
+ if (problem.isError())
+ {
+ // if generated source around UDF source provided by the user is buggy,
+ // this code is appended.
+ problems.append("GENERATED SOURCE ERROR: line ")
+ .append(problem.getSourceLineNumber())
+ .append(" (in generated source): ")
+ .append(problem.getMessage())
+ .append('\n');
+ fullSource = true;
+ }
+ }
+ else
+ {
+ problems.append("Line ")
+ .append(Long.toString(ln))
+ .append(": ")
+ .append(problem.getMessage())
+ .append('\n');
+ }
+ }
+
+ if (fullSource)
+ throw new InvalidRequestException("Java source compilation failed:\n" + problems + "\n generated source:\n" + javaSource);
+ else
+ throw new InvalidRequestException("Java source compilation failed:\n" + problems);
+ }
+
+ // Verify the UDF bytecode against use of probably dangerous code
+ Set<String> errors = udfByteCodeVerifier.verify(targetClassLoader.classData(targetClassName));
+ String validDeclare = "not allowed method declared: " + executeInternalName + '(';
+ String validCall = "call to " + targetClassName.replace('.', '/') + '.' + executeInternalName + "()";
+ for (Iterator<String> i = errors.iterator(); i.hasNext();)
+ {
+ String error = i.next();
+ // we generate a random name of the private, internal execute method, which is detected by the byte-code verifier
+ if (error.startsWith(validDeclare) || error.equals(validCall))
+ {
+ i.remove();
+ }
+ }
+ if (!errors.isEmpty())
+ throw new InvalidRequestException("Java UDF validation failed: " + errors);
+
+ // Load the class and create a new instance of it
+ Thread thread = Thread.currentThread();
+ ClassLoader orig = thread.getContextClassLoader();
+ try
+ {
+ thread.setContextClassLoader(UDFunction.udfClassLoader);
+ // Execute UDF intiialization from UDF class loader
+
+ Class cls = Class.forName(targetClassName, false, targetClassLoader);
+
+ if (cls.getDeclaredMethods().length != 2 || cls.getDeclaredConstructors().length != 1)
+ throw new InvalidRequestException("Check your source to not define additional Java methods or constructors");
+ MethodType methodType = MethodType.methodType(void.class)
+ .appendParameterTypes(DataType.class, DataType[].class);
+ MethodHandle ctor = MethodHandles.lookup().findConstructor(cls, methodType);
+ this.javaUDF = (JavaUDF) ctor.invokeWithArguments(returnDataType, argDataTypes);
+ }
+ finally
+ {
+ thread.setContextClassLoader(orig);
+ }
+ }
+ catch (InvocationTargetException e)
+ {
+ // in case of an ITE, use the cause
+ throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e.getCause()));
+ }
+ catch (VirtualMachineError e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ throw new InvalidRequestException(String.format("Could not compile function '%s' from Java source: %s", name, e));
+ }
+ }
+
+ protected ExecutorService executor()
+ {
+ return executor;
+ }
+
+ protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params)
+ {
+ return javaUDF.executeImpl(protocolVersion, params);
+ }
+
+
+ private static int countNewlines(StringBuilder javaSource)
+ {
+ int ln = 0;
+ for (int i = 0; i < javaSource.length(); i++)
+ if (javaSource.charAt(i) == '\n')
+ ln++;
+ return ln;
+ }
+
+ private static String generateClassName(FunctionName name, char prefix)
+ {
+ String qualifiedName = name.toString();
+
+ StringBuilder sb = new StringBuilder(qualifiedName.length() + 10);
+ sb.append(prefix);
+ for (int i = 0; i < qualifiedName.length(); i++)
+ {
+ char c = qualifiedName.charAt(i);
+ if (Character.isJavaIdentifierPart(c))
+ sb.append(c);
+ else
+ sb.append(Integer.toHexString(((short)c)&0xffff));
+ }
+ sb.append('_')
+ .append(ThreadLocalRandom.current().nextInt() & 0xffffff)
+ .append('_')
+ .append(classSequence.incrementAndGet());
+ return sb.toString();
+ }
+
+ private static String javaSourceName(Class<?> type)
+ {
+ String n = type.getName();
+ return n.startsWith("java.lang.") ? type.getSimpleName() : n;
+ }
+
+ private static String generateArgumentList(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
+ {
+ // initial builder size can just be a guess (prevent temp object allocations)
+ StringBuilder code = new StringBuilder(32 * paramTypes.length);
+ for (int i = 0; i < paramTypes.length; i++)
+ {
+ if (i > 0)
+ code.append(", ");
+ code.append(javaSourceName(paramTypes[i]))
+ .append(' ')
+ .append(argNames.get(i));
+ }
+ return code.toString();
+ }
+
+ private static String generateArguments(Class<?>[] paramTypes, List<ColumnIdentifier> argNames)
+ {
+ StringBuilder code = new StringBuilder(64 * paramTypes.length);
+ for (int i = 0; i < paramTypes.length; i++)
+ {
+ if (i > 0)
+ code.append(",\n");
+
- if (logger.isDebugEnabled())
++ if (logger.isTraceEnabled())
+ code.append(" /* parameter '").append(argNames.get(i)).append("' */\n");
+
+ code
+ // cast to Java type
+ .append(" (").append(javaSourceName(paramTypes[i])).append(") ")
+ // generate object representation of input parameter (call UDFunction.compose)
+ .append(composeMethod(paramTypes[i])).append("(protocolVersion, ").append(i).append(", params.get(").append(i).append("))");
+ }
+ return code.toString();
+ }
+
+ private static String composeMethod(Class<?> type)
+ {
+ return (type.isPrimitive()) ? ("super.compose_" + type.getName()) : "super.compose";
+ }
+
+ // Java source UDFs are a very simple compilation task, which allows us to let one class implement
+ // all interfaces required by ECJ.
+ static final class EcjCompilationUnit implements ICompilationUnit, ICompilerRequestor, INameEnvironment
+ {
+ List<IProblem> problemList;
+ private final String className;
+ private final char[] sourceCode;
+
+ EcjCompilationUnit(String sourceCode, String className)
+ {
+ this.className = className;
+ this.sourceCode = sourceCode.toCharArray();
+ }
+
+ // ICompilationUnit
+
+ @Override
+ public char[] getFileName()
+ {
+ return sourceCode;
+ }
+
+ @Override
+ public char[] getContents()
+ {
+ return sourceCode;
+ }
+
+ @Override
+ public char[] getMainTypeName()
+ {
+ int dot = className.lastIndexOf('.');
+ return ((dot > 0) ? className.substring(dot + 1) : className).toCharArray();
+ }
+
+ @Override
+ public char[][] getPackageName()
+ {
+ StringTokenizer izer = new StringTokenizer(className, ".");
+ char[][] result = new char[izer.countTokens() - 1][];
+ for (int i = 0; i < result.length; i++)
+ result[i] = izer.nextToken().toCharArray();
+ return result;
+ }
+
+ @Override
+ public boolean ignoreOptionalProblems()
+ {
+ return false;
+ }
+
+ // ICompilerRequestor
+
+ @Override
+ public void acceptResult(CompilationResult result)
+ {
+ if (result.hasErrors())
+ {
+ IProblem[] problems = result.getProblems();
+ if (problemList == null)
+ problemList = new ArrayList<>(problems.length);
+ Collections.addAll(problemList, problems);
+ }
+ else
+ {
+ ClassFile[] classFiles = result.getClassFiles();
+ for (ClassFile classFile : classFiles)
+ targetClassLoader.addClass(className, classFile.getBytes());
+ }
+ }
+
+ // INameEnvironment
+
+ @Override
+ public NameEnvironmentAnswer findType(char[][] compoundTypeName)
+ {
+ StringBuilder result = new StringBuilder();
+ for (int i = 0; i < compoundTypeName.length; i++)
+ {
+ if (i > 0)
+ result.append('.');
+ result.append(compoundTypeName[i]);
+ }
+ return findType(result.toString());
+ }
+
+ @Override
+ public NameEnvironmentAnswer findType(char[] typeName, char[][] packageName)
+ {
+ StringBuilder result = new StringBuilder();
+ int i = 0;
+ for (; i < packageName.length; i++)
+ {
+ if (i > 0)
+ result.append('.');
+ result.append(packageName[i]);
+ }
+ if (i > 0)
+ result.append('.');
+ result.append(typeName);
+ return findType(result.toString());
+ }
+
+ private NameEnvironmentAnswer findType(String className)
+ {
+ if (className.equals(this.className))
+ {
+ return new NameEnvironmentAnswer(this, null);
+ }
+
+ String resourceName = className.replace('.', '/') + ".class";
+
+ try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
+ {
+ if (is != null)
+ {
+ byte[] classBytes = ByteStreams.toByteArray(is);
+ char[] fileName = className.toCharArray();
+ ClassFileReader classFileReader = new ClassFileReader(classBytes, fileName, true);
+ return new NameEnvironmentAnswer(classFileReader, null);
+ }
+ }
+ catch (IOException | ClassFormatException exc)
+ {
+ throw new RuntimeException(exc);
+ }
+ return null;
+ }
+
+ private boolean isPackage(String result)
+ {
+ if (result.equals(this.className))
+ return false;
+ String resourceName = result.replace('.', '/') + ".class";
+ try (InputStream is = UDFunction.udfClassLoader.getResourceAsStream(resourceName))
+ {
+ return is == null;
+ }
+ catch (IOException e)
+ {
+ // we are here, since close on is failed. That means it was not null
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isPackage(char[][] parentPackageName, char[] packageName)
+ {
+ StringBuilder result = new StringBuilder();
+ int i = 0;
+ if (parentPackageName != null)
+ for (; i < parentPackageName.length; i++)
+ {
+ if (i > 0)
+ result.append('.');
+ result.append(parentPackageName[i]);
+ }
+
+ if (Character.isUpperCase(packageName[0]) && !isPackage(result.toString()))
+ return false;
+ if (i > 0)
+ result.append('.');
+ result.append(packageName);
+
+ return isPackage(result.toString());
+ }
+
+ @Override
+ public void cleanup()
+ {
+ }
+ }
+
+ static final class EcjTargetClassLoader extends SecureClassLoader
+ {
+ EcjTargetClassLoader()
+ {
+ super(UDFunction.udfClassLoader);
+ }
+
+ // This map is usually empty.
+ // It only contains data *during* UDF compilation but not during runtime.
+ //
+ // addClass() is invoked by ECJ after successful compilation of the generated Java source.
+ // loadClass(targetClassName) is invoked by buildUDF() after ECJ returned from successful compilation.
+ //
+ private final Map<String, byte[]> classes = new ConcurrentHashMap<>();
+
+ void addClass(String className, byte[] classData)
+ {
+ classes.put(className, classData);
+ }
+
+ byte[] classData(String className)
+ {
+ return classes.get(className);
+ }
+
+ protected Class<?> findClass(String name) throws ClassNotFoundException
+ {
+ // remove the class binary - it's only used once - so it's wasting heap
+ byte[] classData = classes.remove(name);
+
+ if (classData != null)
+ return defineClass(name, classData, 0, classData.length, protectionDomain);
+
+ return getParent().loadClass(name);
+ }
+
+ protected PermissionCollection getPermissions(CodeSource codesource)
+ {
+ return ThreadAwareSecurityManager.noPermissions;
+ }
+ }}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index e21d8af,1e5cea6..a07852d
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@@ -263,160 -143,11 +263,160 @@@ public abstract class UDFunction extend
return null;
long tStart = System.nanoTime();
- ByteBuffer result = executeUserDefined(protocolVersion, parameters);
- Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
- return result;
+ parameters = makeEmptyParametersNull(parameters);
+
+ try
+ {
+ // Using async UDF execution is expensive (adds about 100us overhead per invocation on a Core-i7 MBPr).
+ ByteBuffer result = DatabaseDescriptor.enableUserDefinedFunctionsThreads()
+ ? executeAsync(protocolVersion, parameters)
+ : executeUserDefined(protocolVersion, parameters);
+
+ Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
+ return result;
+ }
+ catch (InvalidRequestException e)
+ {
+ throw e;
+ }
+ catch (Throwable t)
+ {
- logger.debug("Invocation of user-defined function '{}' failed", this, t);
++ logger.trace("Invocation of user-defined function '{}' failed", this, t);
+ if (t instanceof VirtualMachineError)
+ throw (VirtualMachineError) t;
+ throw FunctionExecutionException.create(this, t);
+ }
+ }
+
+ public static void assertUdfsEnabled(String language)
+ {
+ if (!DatabaseDescriptor.enableUserDefinedFunctions())
+ throw new InvalidRequestException("User-defined functions are disabled in cassandra.yaml - set enable_user_defined_functions=true to enable");
+ if (!"java".equalsIgnoreCase(language) && !DatabaseDescriptor.enableScriptedUserDefinedFunctions())
+ throw new InvalidRequestException("Scripted user-defined functions are disabled in cassandra.yaml - set enable_scripted_user_defined_functions=true to enable if you are aware of the security risks");
+ }
+
+ private static final class ThreadIdAndCpuTime extends CompletableFuture<Object>
+ {
+ long threadId;
+ long cpuTime;
+
+ ThreadIdAndCpuTime()
+ {
+ // Looks weird?
+ // This call "just" links this class to java.lang.management - otherwise UDFs (script UDFs) might fail due to
+ // java.security.AccessControlException: access denied: ("java.lang.RuntimePermission" "accessClassInPackage.java.lang.management")
+ // because class loading would be deferred until setup() is executed - but setup() is called with
+ // limited privileges.
+ threadMXBean.getCurrentThreadCpuTime();
+ //
+ // Get the TypeCodec stuff in Java Driver initialized.
+ UDHelper.codecRegistry.codecFor(DataType.inet()).format(InetAddress.getLoopbackAddress());
+ UDHelper.codecRegistry.codecFor(DataType.ascii()).format("");
+ }
+
+ void setup()
+ {
+ this.threadId = Thread.currentThread().getId();
+ this.cpuTime = threadMXBean.getCurrentThreadCpuTime();
+ complete(null);
+ }
+ }
+
+ private ByteBuffer executeAsync(int protocolVersion, List<ByteBuffer> parameters)
+ {
+ ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime();
+
+ Future<ByteBuffer> future = executor().submit(() -> {
+ threadIdAndCpuTime.setup();
+ return executeUserDefined(protocolVersion, parameters);
+ });
+
+ try
+ {
+ if (DatabaseDescriptor.getUserDefinedFunctionWarnTimeout() > 0)
+ try
+ {
+ return future.get(DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+
+ // log and emit a warning that UDF execution took long
+ String warn = String.format("User defined function %s ran longer than %dms", this, DatabaseDescriptor.getUserDefinedFunctionWarnTimeout());
+ logger.warn(warn);
+ ClientWarn.warn(warn);
+ }
+
+ // retry with difference of warn-timeout to fail-timeout
+ return future.get(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable c = e.getCause();
+ if (c instanceof RuntimeException)
+ throw (RuntimeException) c;
+ throw new RuntimeException(c);
+ }
+ catch (TimeoutException e)
+ {
+ // retry a last time with the difference of UDF-fail-timeout to consumed CPU time (just in case execution hit a badly timed GC)
+ try
+ {
+ //The threadIdAndCpuTime shouldn't take a long time to be set so this should return immediately
+ threadIdAndCpuTime.get(1, TimeUnit.SECONDS);
+
+ long cpuTimeMillis = threadMXBean.getThreadCpuTime(threadIdAndCpuTime.threadId) - threadIdAndCpuTime.cpuTime;
+ cpuTimeMillis /= 1000000L;
+
+ return future.get(Math.max(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - cpuTimeMillis, 0L),
+ TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e1)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e1)
+ {
+ Throwable c = e.getCause();
+ if (c instanceof RuntimeException)
+ throw (RuntimeException) c;
+ throw new RuntimeException(c);
+ }
+ catch (TimeoutException e1)
+ {
+ TimeoutException cause = new TimeoutException(String.format("User defined function %s ran longer than %dms%s",
+ this,
+ DatabaseDescriptor.getUserDefinedFunctionFailTimeout(),
+ DatabaseDescriptor.getUserFunctionTimeoutPolicy() == Config.UserFunctionTimeoutPolicy.ignore
+ ? "" : " - will stop Cassandra VM"));
+ FunctionExecutionException fe = FunctionExecutionException.create(this, cause);
+ JVMStabilityInspector.userFunctionTimeout(cause);
+ throw fe;
+ }
+ }
}
+ private List<ByteBuffer> makeEmptyParametersNull(List<ByteBuffer> parameters)
+ {
+ List<ByteBuffer> r = new ArrayList<>(parameters.size());
+ for (int i = 0; i < parameters.size(); i++)
+ {
+ ByteBuffer param = parameters.get(i);
+ r.add(UDHelper.isNullOrEmpty(argTypes.get(i), param)
+ ? null : param);
+ }
+ return r;
+ }
+
+ protected abstract ExecutorService executor();
+
public boolean isCallableWrtNullable(List<ByteBuffer> parameters)
{
if (!calledOnNullInput)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 0735103,edc092d..d11d2c5
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@@ -220,22 -186,11 +220,22 @@@ public class CreateIndexStatement exten
}
else
{
- cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+ indexOptions = Collections.emptyMap();
+ kind = cfm.isCompound() ? IndexMetadata.Kind.COMPOSITES : IndexMetadata.Kind.KEYS;
}
- cd.setIndexName(indexName);
- cfm.addDefaultIndexNames();
+ IndexMetadata index = IndexMetadata.fromIndexTargets(cfm, targets, acceptedName, kind, indexOptions);
+
+ // check to disallow creation of an index which duplicates an existing one in all but name
+ Optional<IndexMetadata> existingIndex = Iterables.tryFind(cfm.getIndexes(), existing -> existing.equalsWithoutName(index));
+ if (existingIndex.isPresent())
+ throw new InvalidRequestException(String.format("Index %s is a duplicate of existing index %s",
+ index.name,
+ existingIndex.get().name));
+
- logger.debug("Updating index definition for {}", indexName);
++ logger.trace("Updating index definition for {}", indexName);
+ cfm.indexes(cfm.getIndexes().with(index));
+
MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly);
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a9a8f80,4b418b4..062eb0a
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -244,10 -214,10 +244,10 @@@ public class ColumnFamilyStore implemen
void scheduleFlush()
{
- int period = metadata.getMemtableFlushPeriod();
+ int period = metadata.params.memtableFlushPeriodInMs;
if (period > 0)
{
- logger.debug("scheduling flush in {} ms", period);
+ logger.trace("scheduling flush in {} ms", period);
WrappedRunnable runnable = new WrappedRunnable()
{
protected void runMayThrow() throws Exception
@@@ -422,7 -422,7 +422,7 @@@
{
throw new RuntimeException(e);
}
- logger.debug("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry);
- logger.trace("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
++ logger.trace("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry);
latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
{
public void run()
@@@ -564,14 -543,36 +564,14 @@@
{
Directories directories = new Directories(metadata);
- // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
+ // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
clearEphemeralSnapshots(directories);
- logger.debug("Removing temporary or obsoleted files from unfinished operations for table", metadata.cfName);
- // remove any left-behind SSTables from failed/stalled streaming
- FileFilter filter = new FileFilter()
- {
- public boolean accept(File pathname)
- {
- return pathname.getPath().endsWith(StreamLockfile.FILE_EXT);
- }
- };
- for (File dir : directories.getCFDirectories())
- {
- File[] lockfiles = dir.listFiles(filter);
- // lock files can be null if I/O error happens
- if (lockfiles == null || lockfiles.length == 0)
- continue;
- logger.info("Removing SSTables from failed streaming session. Found {} files to cleanup.", lockfiles.length);
-
- for (File lockfile : lockfiles)
- {
- StreamLockfile streamLockfile = new StreamLockfile(lockfile);
- streamLockfile.cleanup();
- streamLockfile.delete();
- }
- }
-
- logger.trace("Removing compacted SSTable files from {} (see http://wiki.apache.org/cassandra/MemtableSSTable)", metadata.cfName);
++ logger.trace("Removing temporary or obsoleted files from unfinished operations for table", metadata.cfName);
+ LifecycleTransaction.removeUnfinishedLeftovers(metadata);
- logger.debug("Further extra check for orphan sstable files for {}", metadata.cfName);
- for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
++ logger.trace("Further extra check for orphan sstable files for {}", metadata.cfName);
+ for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
Set<Component> components = sstableFiles.getValue();
@@@ -813,16 -904,19 +813,16 @@@
onHeapTotal += memtable.getAllocator().onHeap().owns();
offHeapTotal += memtable.getAllocator().offHeap().owns();
- for (SecondaryIndex index : indexManager.getIndexes())
+ for (ColumnFamilyStore indexCfs : indexManager.getAllIndexColumnFamilyStores())
{
- if (index.getIndexCfs() != null)
- {
- MemtableAllocator allocator = index.getIndexCfs().getTracker().getView().getCurrentMemtable().getAllocator();
- onHeapRatio += allocator.onHeap().ownershipRatio();
- offHeapRatio += allocator.offHeap().ownershipRatio();
- onHeapTotal += allocator.onHeap().owns();
- offHeapTotal += allocator.offHeap().owns();
- }
+ MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
+ onHeapRatio += allocator.onHeap().ownershipRatio();
+ offHeapRatio += allocator.offHeap().ownershipRatio();
+ onHeapTotal += allocator.onHeap().owns();
+ offHeapTotal += allocator.offHeap().owns();
}
- logger.info("Enqueuing flush of {}: {}", name, String.format("%d (%.0f%%) on-heap, %d (%.0f%%) off-heap",
+ logger.debug("Enqueuing flush of {}: {}", name, String.format("%d (%.0f%%) on-heap, %d (%.0f%%) off-heap",
onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100));
}
@@@ -1152,9 -1341,9 +1152,9 @@@
* @return sstables whose key range overlaps with that of the given sstables, not including itself.
* (The given sstables may or may not overlap with each other.)
*/
- public Collection<SSTableReader> getOverlappingSSTables(Iterable<SSTableReader> sstables)
+ public Collection<SSTableReader> getOverlappingSSTables(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
{
- logger.debug("Checking for sstables overlapping {}", sstables);
+ logger.trace("Checking for sstables overlapping {}", sstables);
// a normal compaction won't ever have an empty sstables list, but we create a skeleton
// compaction controller for streaming, and that passes an empty list.
@@@ -1636,10 -2409,10 +1636,10 @@@
SSTableReader sstable = active.get(entries.getKey().generation);
if (sstable == null || !refs.tryRef(sstable))
{
- if (logger.isDebugEnabled())
- logger.debug("using snapshot sstable {}", entries.getKey());
+ if (logger.isTraceEnabled())
+ logger.trace("using snapshot sstable {}", entries.getKey());
// open without tracking hotness
- sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner, true, false);
+ sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, true, false);
refs.tryRef(sstable);
// release the self ref as we never add the snapshot sstable to DataTracker where it is otherwise released
sstable.selfRef().release();
@@@ -1854,9 -2634,9 +1854,9 @@@
// beginning if we restart before they [the CL segments] are discarded for
// normal reasons post-truncate. To prevent this, we store truncation
// position in the System keyspace.
- logger.debug("truncating {}", name);
+ logger.trace("truncating {}", name);
- if (keyspace.getMetadata().durableWrites || DatabaseDescriptor.isAutoSnapshot())
+ if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot())
{
// flush the CF being truncated before forcing the new segment
forceBlockingFlush();
@@@ -1887,47 -2670,30 +1887,47 @@@
ReplayPosition replayAfter = discardSSTables(truncatedAt);
- for (SecondaryIndex index : indexManager.getIndexes())
- index.truncateBlocking(truncatedAt);
+ indexManager.truncateAllIndexesBlocking(truncatedAt);
+
+ viewManager.truncateBlocking(truncatedAt);
SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
- logger.debug("cleaning out row cache");
+ logger.trace("cleaning out row cache");
invalidateCaches();
}
};
- runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
+ runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
- logger.debug("truncate complete");
+ logger.trace("truncate complete");
}
- public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
+ /**
+ * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
+ */
+ public void dumpMemtable()
+ {
+ synchronized (data)
+ {
+ final Flush flush = new Flush(true);
+ flushExecutor.execute(flush);
+ postFlushExecutor.submit(flush.postFlush);
+ }
+ }
+
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
{
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
synchronized (this)
{
- logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
+ logger.trace("Cancelling in-progress compactions for {}", metadata.cfName);
- Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
- for (ColumnFamilyStore cfs : selfWithIndexes)
- cfs.getCompactionStrategy().pause();
+ Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
+ ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
+ : concatWithIndexes();
+
+ for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+ cfs.getCompactionStrategyManager().pause();
try
{
// interrupt in-progress compactions