You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:21 UTC
[15/37] cassandra git commit: Make TableMetadata immutable,
optimize Schema
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index ee1ba6a..cf7f3d3 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -27,7 +27,7 @@ import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Striped;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.tracing.Tracing;
@@ -41,7 +41,7 @@ public class PaxosState
private final Commit accepted;
private final Commit mostRecentCommit;
- public PaxosState(DecoratedKey key, CFMetaData metadata)
+ public PaxosState(DecoratedKey key, TableMetadata metadata)
{
this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
}
@@ -92,7 +92,7 @@ public class PaxosState
}
finally
{
- Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepare.addNano(System.nanoTime() - start);
+ Keyspace.open(toPrepare.update.metadata().keyspace).getColumnFamilyStore(toPrepare.update.metadata().id).metric.casPrepare.addNano(System.nanoTime() - start);
}
}
@@ -127,7 +127,7 @@ public class PaxosState
}
finally
{
- Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casPropose.addNano(System.nanoTime() - start);
+ Keyspace.open(proposal.update.metadata().keyspace).getColumnFamilyStore(proposal.update.metadata().id).metric.casPropose.addNano(System.nanoTime() - start);
}
}
@@ -143,7 +143,7 @@ public class PaxosState
// erase the in-progress update.
// The table may have been truncated since the proposal was initiated. In that case, we
// don't want to perform the mutation and potentially resurrect truncated data
- if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().cfId))
+ if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().id))
{
Tracing.trace("Committing proposal {}", proposal);
Mutation mutation = proposal.makeMutation();
@@ -158,7 +158,7 @@ public class PaxosState
}
finally
{
- Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime() - start);
+ Keyspace.open(proposal.update.metadata().keyspace).getColumnFamilyStore(proposal.update.metadata().id).metric.casCommit.addNano(System.nanoTime() - start);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 5915eab..381c498 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -28,12 +28,13 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.UUIDGen;
@@ -49,7 +50,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
- public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
+ public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
{
super(targets, consistency, queryStartNanoTime);
// need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
@@ -89,7 +90,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
latch.countDown();
}
- public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec)
+ public Iterable<InetAddress> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec)
{
// In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
// coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index fab9372..0eee6f0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
import com.ning.compress.lzf.LZFInputStream;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -42,22 +42,19 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.io.util.TrackedInputStream;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
/**
* StreamReader reads from stream and writes to SSTable.
*/
public class StreamReader
{
private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
- protected final UUID cfId;
+ protected final TableId tableId;
protected final long estimatedKeys;
protected final Collection<Pair<Long, Long>> sections;
protected final StreamSession session;
@@ -71,7 +68,7 @@ public class StreamReader
public StreamReader(FileMessageHeader header, StreamSession session)
{
this.session = session;
- this.cfId = header.cfId;
+ this.tableId = header.tableId;
this.estimatedKeys = header.estimatedKeys;
this.sections = header.sections;
this.inputVersion = header.version;
@@ -92,15 +89,11 @@ public class StreamReader
{
long totalSize = totalSize();
- Pair<String, String> kscf = Schema.instance.getCF(cfId);
- ColumnFamilyStore cfs = null;
- if (kscf != null)
- cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
- if (kscf == null || cfs == null)
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ if (cfs == null)
{
// schema was dropped during streaming
- throw new IOException("CF " + cfId + " was dropped during streaming");
+ throw new IOException("CF " + tableId + " was dropped during streaming");
}
logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
@@ -108,7 +101,7 @@ public class StreamReader
cfs.getColumnFamilyName());
TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
- StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()),
totalSize, session.planId());
SSTableMultiWriter writer = null;
try
@@ -142,7 +135,7 @@ public class StreamReader
}
}
- protected SerializationHeader getHeader(CFMetaData metadata)
+ protected SerializationHeader getHeader(TableMetadata metadata)
{
return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
}
@@ -153,7 +146,7 @@ public class StreamReader
if (localDir == null)
throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata));
+ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
return writer;
}
@@ -181,7 +174,7 @@ public class StreamReader
public static final String BUFFER_FILE_PREFIX = "buf";
public static final String BUFFER_FILE_SUFFIX = "dat";
- private final CFMetaData metadata;
+ private final TableMetadata metadata;
private final DataInputPlus in;
private final SerializationHeader header;
private final SerializationHelper helper;
@@ -192,7 +185,7 @@ public class StreamReader
private Row staticRow;
private IOException exception;
- public StreamDeserializer(CFMetaData metadata, InputStream in, Version version, SerializationHeader header,
+ public StreamDeserializer(TableMetadata metadata, InputStream in, Version version, SerializationHeader header,
long totalSize, UUID sessionId) throws IOException
{
this.metadata = metadata;
@@ -203,22 +196,22 @@ public class StreamReader
public StreamDeserializer newPartition() throws IOException
{
- key = metadata.decorateKey(ByteBufferUtil.readWithShortLength(in));
+ key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
staticRow = iterator.readStaticRow();
return this;
}
- public CFMetaData metadata()
+ public TableMetadata metadata()
{
return metadata;
}
- public PartitionColumns columns()
+ public RegularAndStaticColumns columns()
{
// We don't know which columns we'll get so assume it can be all of them
- return metadata.partitionColumns();
+ return metadata.regularAndStaticColumns();
}
public boolean isReverseOrder()
@@ -308,13 +301,13 @@ public class StreamReader
}
}
- private static File getTempBufferFile(CFMetaData metadata, long totalSize, UUID sessionId) throws IOException
+ private static File getTempBufferFile(TableMetadata metadata, long totalSize, UUID sessionId) throws IOException
{
- ColumnFamilyStore cfs = Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName);
+ ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
if (cfs == null)
{
// schema was dropped during streaming
- throw new RuntimeException(String.format("CF %s.%s was dropped during streaming", metadata.ksName, metadata.cfName));
+ throw new RuntimeException(String.format("Table %s was dropped during streaming", metadata.toString()));
}
long maxSize = Math.min(MAX_SPILL_FILE_SIZE, totalSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6c60b74..d0c4d50 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -30,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
@@ -45,8 +43,8 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -75,9 +73,9 @@ public class StreamReceiveTask extends StreamTask
private int remoteSSTablesReceived = 0;
- public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
+ public StreamReceiveTask(StreamSession session, TableId tableId, int totalFiles, long totalSize)
{
- super(session, cfId);
+ super(session, tableId);
this.totalFiles = totalFiles;
this.totalSize = totalSize;
// this is an "offline" transaction, as we currently manually expose the sstables once done;
@@ -102,7 +100,7 @@ public class StreamReceiveTask extends StreamTask
}
remoteSSTablesReceived++;
- assert cfId.equals(sstable.getCfId());
+ assert tableId.equals(sstable.getTableId());
Collection<SSTableReader> finished = null;
try
@@ -136,7 +134,7 @@ public class StreamReceiveTask extends StreamTask
public synchronized LifecycleTransaction getTransaction()
{
if (done)
- throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), cfId));
+ throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), tableId));
return txn;
}
@@ -156,8 +154,8 @@ public class StreamReceiveTask extends StreamTask
ColumnFamilyStore cfs = null;
try
{
- Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
- if (kscf == null)
+ cfs = ColumnFamilyStore.getIfExists(task.tableId);
+ if (cfs == null)
{
// schema was dropped during streaming
task.sstables.clear();
@@ -165,9 +163,8 @@ public class StreamReceiveTask extends StreamTask
task.session.taskCompleted(task);
return;
}
- cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- hasCDC = cfs.metadata.params.cdc;
+ hasViews = !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName()));
+ hasCDC = cfs.metadata().params.cdc;
Collection<SSTableReader> readers = task.sstables;
@@ -193,7 +190,7 @@ public class StreamReceiveTask extends StreamTask
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
- Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata)));
+ Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata())));
// MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
// before transaction is done.
@@ -215,7 +212,7 @@ public class StreamReceiveTask extends StreamTask
cfs.indexManager.buildAllIndexesBlocking(readers);
//invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())
{
List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
@@ -230,7 +227,7 @@ public class StreamReceiveTask extends StreamTask
cfs.keyspace.getName(), cfs.getTableName());
}
- if (cfs.metadata.isCounter())
+ if (cfs.metadata().isCounter())
{
int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
if (invalidatedKeys > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 736d30f..faa05d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.messages.*;
import org.apache.cassandra.utils.CassandraVersion;
@@ -147,9 +148,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
// streaming tasks are created and managed per ColumnFamily ID
@VisibleForTesting
- protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers = new ConcurrentHashMap<>();
// data receivers, filled after receiving prepare message
- private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
+ private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
/* can be null when session is created in remote */
private final StreamConnectionFactory factory;
@@ -223,10 +224,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
- public LifecycleTransaction getTransaction(UUID cfId)
+ public LifecycleTransaction getTransaction(TableId tableId)
{
- assert receivers.containsKey(cfId);
- return receivers.get(cfId).getTransaction();
+ assert receivers.containsKey(tableId);
+ return receivers.get(tableId).getTransaction();
}
private boolean isKeepAliveSupported()
@@ -424,13 +425,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber
continue;
}
- UUID cfId = details.ref.get().metadata.cfId;
- StreamTransferTask task = transfers.get(cfId);
+ TableId tableId = details.ref.get().metadata().id;
+ StreamTransferTask task = transfers.get(tableId);
if (task == null)
{
//guarantee atomicity
- StreamTransferTask newTask = new StreamTransferTask(this, cfId);
- task = transfers.putIfAbsent(cfId, newTask);
+ StreamTransferTask newTask = new StreamTransferTask(this, tableId);
+ task = transfers.putIfAbsent(tableId, newTask);
if (task == null)
task = newTask;
}
@@ -525,7 +526,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
case RECEIVED:
ReceivedMessage received = (ReceivedMessage) message;
- received(received.cfId, received.sequenceNumber);
+ received(received.tableId, received.sequenceNumber);
break;
case COMPLETE:
@@ -634,7 +635,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
StreamingMetrics.totalOutgoingBytes.inc(headerSize);
metrics.outgoingBytes.inc(headerSize);
// schedule timeout for receiving ACK
- StreamTransferTask task = transfers.get(header.cfId);
+ StreamTransferTask task = transfers.get(header.tableId);
if (task != null)
{
task.scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS);
@@ -652,8 +653,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
StreamingMetrics.totalIncomingBytes.inc(headerSize);
metrics.incomingBytes.inc(headerSize);
// send back file received message
- handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber));
- receivers.get(message.header.cfId).received(message.sstable);
+ handler.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
+ receivers.get(message.header.tableId).received(message.sstable);
}
public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
@@ -662,9 +663,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
streamResult.handleProgress(progress);
}
- public void received(UUID cfId, int sequenceNumber)
+ public void received(TableId tableId, int sequenceNumber)
{
- transfers.get(cfId).complete(sequenceNumber);
+ transfers.get(tableId).complete(sequenceNumber);
}
/**
@@ -723,13 +724,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public synchronized void taskCompleted(StreamReceiveTask completedTask)
{
- receivers.remove(completedTask.cfId);
+ receivers.remove(completedTask.tableId);
maybeCompleted();
}
public synchronized void taskCompleted(StreamTransferTask completedTask)
{
- transfers.remove(completedTask.cfId);
+ transfers.remove(completedTask.tableId);
maybeCompleted();
}
@@ -793,7 +794,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
failIfFinished();
if (summary.files > 0)
- receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
+ receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize));
}
private void startStreamingFiles()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java
index c427283..0d94f57 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSummary.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.UUIDSerializer;
/**
@@ -37,7 +38,7 @@ public class StreamSummary implements Serializable
{
public static final IVersionedSerializer<StreamSummary> serializer = new StreamSummarySerializer();
- public final UUID cfId;
+ public final TableId tableId;
/**
* Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
@@ -45,9 +46,9 @@ public class StreamSummary implements Serializable
public final int files;
public final long totalSize;
- public StreamSummary(UUID cfId, int files, long totalSize)
+ public StreamSummary(TableId tableId, int files, long totalSize)
{
- this.cfId = cfId;
+ this.tableId = tableId;
this.files = files;
this.totalSize = totalSize;
}
@@ -58,20 +59,20 @@ public class StreamSummary implements Serializable
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamSummary summary = (StreamSummary) o;
- return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
+ return files == summary.files && totalSize == summary.totalSize && tableId.equals(summary.tableId);
}
@Override
public int hashCode()
{
- return Objects.hashCode(cfId, files, totalSize);
+ return Objects.hashCode(tableId, files, totalSize);
}
@Override
public String toString()
{
final StringBuilder sb = new StringBuilder("StreamSummary{");
- sb.append("path=").append(cfId);
+ sb.append("path=").append(tableId);
sb.append(", files=").append(files);
sb.append(", totalSize=").append(totalSize);
sb.append('}');
@@ -80,25 +81,24 @@ public class StreamSummary implements Serializable
public static class StreamSummarySerializer implements IVersionedSerializer<StreamSummary>
{
- // arbitrary version is fine for UUIDSerializer for now...
public void serialize(StreamSummary summary, DataOutputPlus out, int version) throws IOException
{
- UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version);
+ summary.tableId.serialize(out);
out.writeInt(summary.files);
out.writeLong(summary.totalSize);
}
public StreamSummary deserialize(DataInputPlus in, int version) throws IOException
{
- UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ TableId tableId = TableId.deserialize(in);
int files = in.readInt();
long totalSize = in.readLong();
- return new StreamSummary(cfId, files, totalSize);
+ return new StreamSummary(tableId, files, totalSize);
}
public long serializedSize(StreamSummary summary, int version)
{
- long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version);
+ long size = summary.tableId.serializedSize();
size += TypeSizes.sizeof(summary.files);
size += TypeSizes.sizeof(summary.totalSize);
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java
index ac72cff..1e22c34 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTask.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.streaming;
-import java.util.UUID;
+import org.apache.cassandra.schema.TableId;
/**
* StreamTask is an abstraction of the streaming task performed over specific ColumnFamily.
@@ -27,12 +27,12 @@ public abstract class StreamTask
/** StreamSession that this task belongs */
protected final StreamSession session;
- protected final UUID cfId;
+ protected final TableId tableId;
- protected StreamTask(StreamSession session, UUID cfId)
+ protected StreamTask(StreamSession session, TableId tableId)
{
this.session = session;
- this.cfId = cfId;
+ this.tableId = tableId;
}
/**
@@ -56,6 +56,6 @@ public abstract class StreamTask
*/
public StreamSummary getSummary()
{
- return new StreamSummary(cfId, getTotalNumberOfFiles(), getTotalSize());
+ return new StreamSummary(tableId, getTotalNumberOfFiles(), getTotalSize());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 4f313c3..aa3251b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -27,6 +27,7 @@ import com.google.common.base.Throwables;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -47,14 +48,14 @@ public class StreamTransferTask extends StreamTask
private long totalSize;
- public StreamTransferTask(StreamSession session, UUID cfId)
+ public StreamTransferTask(StreamSession session, TableId tableId)
{
- super(session, cfId);
+ super(session, tableId);
}
public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
- assert ref.get() != null && cfId.equals(ref.get().metadata.cfId);
+ assert ref.get() != null && tableId.equals(ref.get().metadata().id);
OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message);
files.put(message.header.sequenceNumber, message);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 2044d4d..6ac607f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,9 +25,7 @@ import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.util.TrackedInputStream;
@@ -66,15 +64,12 @@ public class CompressedStreamReader extends StreamReader
{
long totalSize = totalSize();
- Pair<String, String> kscf = Schema.instance.getCF(cfId);
- ColumnFamilyStore cfs = null;
- if (kscf != null)
- cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
- if (kscf == null || cfs == null)
+ if (cfs == null)
{
// schema was dropped during streaming
- throw new IOException("CF " + cfId + " was dropped during streaming");
+ throw new IOException("CF " + tableId + " was dropped during streaming");
}
logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
@@ -85,7 +80,7 @@ public class CompressedStreamReader extends StreamReader
ChecksumType.CRC32, cfs::getCrcCheckChance);
TrackedInputStream in = new TrackedInputStream(cis);
- StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()),
totalSize, session.planId());
SSTableMultiWriter writer = null;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
index 9ef23ab..a1f2496 100644
--- a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
@@ -19,18 +19,18 @@ package org.apache.cassandra.streaming.management;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import javax.management.openmbean.*;
import com.google.common.base.Throwables;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.StreamSummary;
/**
*/
public class StreamSummaryCompositeData
{
- private static final String[] ITEM_NAMES = new String[]{"cfId",
+ private static final String[] ITEM_NAMES = new String[]{"tableId",
"files",
"totalSize"};
private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
@@ -60,7 +60,7 @@ public class StreamSummaryCompositeData
public static CompositeData toCompositeData(StreamSummary streamSummary)
{
Map<String, Object> valueMap = new HashMap<>();
- valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
+ valueMap.put(ITEM_NAMES[0], streamSummary.tableId.toString());
valueMap.put(ITEM_NAMES[1], streamSummary.files);
valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
try
@@ -76,7 +76,7 @@ public class StreamSummaryCompositeData
public static StreamSummary fromCompositeData(CompositeData cd)
{
Object[] values = cd.getAll(ITEM_NAMES);
- return new StreamSummary(UUID.fromString((String) values[0]),
+ return new StreamSummary(TableId.fromString((String) values[0]),
(int) values[1],
(long) values[2]);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index b0639ea..a37420b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.TypeSizes;
@@ -30,9 +29,9 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.compress.CompressionInfo;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDSerializer;
/**
* StreamingFileHeader is appended before sending actual data to describe what it's sending.
@@ -41,7 +40,7 @@ public class FileMessageHeader
{
public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
- public final UUID cfId;
+ public final TableId tableId;
public final int sequenceNumber;
/** SSTable version */
public final Version version;
@@ -64,7 +63,7 @@ public class FileMessageHeader
/* cached size value */
private transient final long size;
- public FileMessageHeader(UUID cfId,
+ public FileMessageHeader(TableId tableId,
int sequenceNumber,
Version version,
SSTableFormat.Type format,
@@ -75,7 +74,7 @@ public class FileMessageHeader
int sstableLevel,
SerializationHeader.Component header)
{
- this.cfId = cfId;
+ this.tableId = tableId;
this.sequenceNumber = sequenceNumber;
this.version = version;
this.format = format;
@@ -89,7 +88,7 @@ public class FileMessageHeader
this.size = calculateSize();
}
- public FileMessageHeader(UUID cfId,
+ public FileMessageHeader(TableId tableId,
int sequenceNumber,
Version version,
SSTableFormat.Type format,
@@ -100,7 +99,7 @@ public class FileMessageHeader
int sstableLevel,
SerializationHeader.Component header)
{
- this.cfId = cfId;
+ this.tableId = tableId;
this.sequenceNumber = sequenceNumber;
this.version = version;
this.format = format;
@@ -152,7 +151,7 @@ public class FileMessageHeader
public String toString()
{
final StringBuilder sb = new StringBuilder("Header (");
- sb.append("cfId: ").append(cfId);
+ sb.append("tableId: ").append(tableId);
sb.append(", #").append(sequenceNumber);
sb.append(", version: ").append(version);
sb.append(", format: ").append(format);
@@ -171,13 +170,13 @@ public class FileMessageHeader
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FileMessageHeader that = (FileMessageHeader) o;
- return sequenceNumber == that.sequenceNumber && cfId.equals(that.cfId);
+ return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
}
@Override
public int hashCode()
{
- int result = cfId.hashCode();
+ int result = tableId.hashCode();
result = 31 * result + sequenceNumber;
return result;
}
@@ -186,7 +185,7 @@ public class FileMessageHeader
{
public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
{
- UUIDSerializer.serializer.serialize(header.cfId, out, version);
+ header.tableId.serialize(out);
out.writeInt(header.sequenceNumber);
out.writeUTF(header.version.toString());
out.writeUTF(header.format.name);
@@ -212,7 +211,7 @@ public class FileMessageHeader
public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
{
- UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ TableId tableId = TableId.deserialize(in);
int sequenceNumber = in.readInt();
Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
@@ -227,12 +226,12 @@ public class FileMessageHeader
int sstableLevel = in.readInt();
SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in);
- return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header);
+ return new FileMessageHeader(tableId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header);
}
public long serializedSize(FileMessageHeader header, int version)
{
- long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
+ long size = header.tableId.serializedSize();
size += TypeSizes.sizeof(header.sequenceNumber);
size += TypeSizes.sizeof(header.version.toString());
size += TypeSizes.sizeof(header.format.name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 6723d17..fba9ec4 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -72,7 +72,7 @@ public class OutgoingFileMessage extends StreamMessage
SSTableReader sstable = ref.get();
filename = sstable.getFilename();
- this.header = new FileMessageHeader(sstable.metadata.cfId,
+ this.header = new FileMessageHeader(sstable.metadata().id,
sequenceNumber,
sstable.descriptor.version,
sstable.descriptor.formatType,
@@ -81,7 +81,7 @@ public class OutgoingFileMessage extends StreamMessage
sstable.compression ? sstable.getCompressionMetadata() : null,
repairedAt,
keepSSTableLevel ? sstable.getSSTableLevel() : 0,
- sstable.header == null ? null : sstable.header.toComponent());
+ sstable.header.toComponent());
}
public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 251b9c8..55dd7e6 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -20,14 +20,12 @@ package org.apache.cassandra.streaming.messages;
import java.io.*;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
-import java.util.UUID;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.UUIDSerializer;
public class ReceivedMessage extends StreamMessage
{
@@ -37,23 +35,23 @@ public class ReceivedMessage extends StreamMessage
public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
{
DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
- return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
+ return new ReceivedMessage(TableId.deserialize(input), input.readInt());
}
public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
- UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
+ message.tableId.serialize(out);
out.writeInt(message.sequenceNumber);
}
};
- public final UUID cfId;
+ public final TableId tableId;
public final int sequenceNumber;
- public ReceivedMessage(UUID cfId, int sequenceNumber)
+ public ReceivedMessage(TableId tableId, int sequenceNumber)
{
super(Type.RECEIVED);
- this.cfId = cfId;
+ this.tableId = tableId;
this.sequenceNumber = sequenceNumber;
}
@@ -61,7 +59,7 @@ public class ReceivedMessage extends StreamMessage
public String toString()
{
final StringBuilder sb = new StringBuilder("Received (");
- sb.append(cfId).append(", #").append(sequenceNumber).append(')');
+ sb.append(tableId).append(", #").append(sequenceNumber).append(')');
return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java
index dde732a..1d05103 100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@ -30,8 +30,9 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
@@ -46,14 +47,14 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.impl.Indenter;
-import org.codehaus.jackson.util.DefaultPrettyPrinter;
import org.codehaus.jackson.util.DefaultPrettyPrinter.NopIndenter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.codehaus.jackson.util.DefaultPrettyPrinter;
public final class JsonTransformer
{
@@ -68,7 +69,7 @@ public final class JsonTransformer
private final CompactIndenter arrayIndenter = new CompactIndenter();
- private final CFMetaData metadata;
+ private final TableMetadata metadata;
private final ISSTableScanner currentScanner;
@@ -76,7 +77,7 @@ public final class JsonTransformer
private long currentPosition = 0;
- private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, CFMetaData metadata)
+ private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, TableMetadata metadata)
{
this.json = json;
this.metadata = metadata;
@@ -89,7 +90,7 @@ public final class JsonTransformer
json.setPrettyPrinter(prettyPrinter);
}
- public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, CFMetaData metadata, OutputStream out)
+ public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, TableMetadata metadata, OutputStream out)
throws IOException
{
try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, StandardCharsets.UTF_8)))
@@ -101,7 +102,7 @@ public final class JsonTransformer
}
}
- public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, CFMetaData metadata, OutputStream out) throws IOException
+ public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, TableMetadata metadata, OutputStream out) throws IOException
{
try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, StandardCharsets.UTF_8)))
{
@@ -119,7 +120,7 @@ public final class JsonTransformer
private void serializePartitionKey(DecoratedKey key)
{
- AbstractType<?> keyValidator = metadata.getKeyValidator();
+ AbstractType<?> keyValidator = metadata.partitionKeyType;
objectIndenter.setCompact(true);
try
{
@@ -223,7 +224,7 @@ public final class JsonTransformer
}
catch (IOException e)
{
- String key = metadata.getKeyValidator().getString(partition.partitionKey().getKey());
+ String key = metadata.partitionKeyType.getString(partition.partitionKey().getKey());
logger.error("Fatal error parsing partition: {}", key, e);
}
}
@@ -334,10 +335,10 @@ public final class JsonTransformer
objectIndenter.setCompact(true);
json.writeStartArray();
arrayIndenter.setCompact(true);
- List<ColumnDefinition> clusteringColumns = metadata.clusteringColumns();
+ List<ColumnMetadata> clusteringColumns = metadata.clusteringColumns();
for (int i = 0; i < clusteringColumns.size(); i++)
{
- ColumnDefinition column = clusteringColumns.get(i);
+ ColumnMetadata column = clusteringColumns.get(i);
if (i >= clustering.size())
{
json.writeString("*");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
index 1f407cb..56c57d9 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.tools;
-import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.HashSet;
@@ -27,8 +26,8 @@ import java.util.Set;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
@@ -46,7 +45,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
*/
public class SSTableExpiredBlockers
{
- public static void main(String[] args) throws IOException
+ public static void main(String[] args)
{
PrintStream out = System.out;
if (args.length < 2)
@@ -61,11 +60,7 @@ public class SSTableExpiredBlockers
String columnfamily = args[args.length - 1];
Schema.instance.loadFromDisk(false);
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnfamily);
- if (metadata == null)
- throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
- keyspace,
- columnfamily));
+ TableMetadata metadata = Schema.instance.validateTable(keyspace, columnfamily);
Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 52d5ecf..913ee1f 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -27,7 +27,7 @@ import java.util.stream.StreamSupport;
import org.apache.commons.cli.*;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.DecoratedKey;
@@ -43,6 +43,7 @@ import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -88,10 +89,10 @@ public class SSTableExport
* Construct table schema from info stored in SSTable's Stats.db
*
* @param desc SSTable's descriptor
- * @return Restored CFMetaData
+ * @return Restored TableMetadata
* @throws IOException when Stats.db cannot be read
*/
- public static CFMetaData metadataFromSSTable(Descriptor desc) throws IOException
+ public static TableMetadata metadataFromSSTable(Descriptor desc) throws IOException
{
if (!desc.version.isCompatible())
throw new IOException("Cannot process old and unsupported SSTable version.");
@@ -101,7 +102,7 @@ public class SSTableExport
SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER);
IPartitioner partitioner = FBUtilities.newPartitioner(desc);
- CFMetaData.Builder builder = CFMetaData.Builder.create("keyspace", "table").withPartitioner(partitioner);
+ TableMetadata.Builder builder = TableMetadata.builder("keyspace", "table").partitioner(partitioner);
header.getStaticColumns().entrySet().stream()
.forEach(entry -> {
ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true);
@@ -112,7 +113,7 @@ public class SSTableExport
ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true);
builder.addRegularColumn(ident, entry.getValue());
});
- builder.addPartitionKey("PartitionKey", header.getKeyType());
+ builder.addPartitionKeyColumn("PartitionKey", header.getKeyType());
for (int i = 0; i < header.getClusteringTypes().size(); i++)
{
builder.addClusteringColumn("clustering" + (i > 0 ? i : ""), header.getClusteringTypes().get(i));
@@ -170,7 +171,7 @@ public class SSTableExport
Descriptor desc = Descriptor.fromFilename(ssTableFileName);
try
{
- CFMetaData metadata = metadataFromSSTable(desc);
+ TableMetadata metadata = metadataFromSSTable(desc);
if (cmd.hasOption(ENUMERATE_KEYS_OPTION))
{
try (KeyIterator iter = new KeyIterator(desc, metadata))
@@ -183,14 +184,14 @@ public class SSTableExport
}
else
{
- SSTableReader sstable = SSTableReader.openNoValidation(desc, metadata);
+ SSTableReader sstable = SSTableReader.openNoValidation(desc, TableMetadataRef.forOfflineTools(metadata));
IPartitioner partitioner = sstable.getPartitioner();
final ISSTableScanner currentScanner;
if ((keys != null) && (keys.length > 0))
{
List<AbstractBounds<PartitionPosition>> bounds = Arrays.stream(keys)
.filter(key -> !excludes.contains(key))
- .map(metadata.getKeyValidator()::fromString)
+ .map(metadata.partitionKeyType::fromString)
.map(partitioner::decorateKey)
.sorted()
.map(DecoratedKey::getToken)
@@ -202,7 +203,7 @@ public class SSTableExport
currentScanner = sstable.getScanner();
}
Stream<UnfilteredRowIterator> partitions = iterToStream(currentScanner).filter(i ->
- excludes.isEmpty() || !excludes.contains(metadata.getKeyValidator().getString(i.partitionKey().getKey()))
+ excludes.isEmpty() || !excludes.contains(metadata.partitionKeyType.getString(i.partitionKey().getKey()))
);
if (cmd.hasOption(DEBUG_OUTPUT_OPTION))
{
@@ -213,19 +214,19 @@ public class SSTableExport
if (!partition.partitionLevelDeletion().isLive())
{
- System.out.println("[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" +
+ System.out.println("[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@" +
position.get() + " " + partition.partitionLevelDeletion());
}
if (!partition.staticRow().isEmpty())
{
- System.out.println("[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" +
+ System.out.println("[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@" +
position.get() + " " + partition.staticRow().toString(metadata, true));
}
partition.forEachRemaining(row ->
{
System.out.println(
- "[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@"
- + position.get() + " " + row.toString(metadata, false, true));
+ "[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@"
+ + position.get() + " " + row.toString(metadata, false, true));
position.set(currentScanner.getCurrentPosition());
});
});
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
index 915edf1..3a66ef9 100644
--- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
@@ -21,7 +21,7 @@ import java.io.PrintStream;
import java.util.Map;
import java.util.Set;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
@@ -69,7 +69,7 @@ public class SSTableLevelResetter
String keyspaceName = args[1];
String columnfamily = args[2];
// validate columnfamily
- if (Schema.instance.getCFMetaData(keyspaceName, columnfamily) == null)
+ if (Schema.instance.getTableMetadataRef(keyspaceName, columnfamily) == null)
{
System.err.println("ColumnFamily not found: " + keyspaceName + "/" + columnfamily);
System.exit(1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index 9f0395b..1116575 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -34,7 +34,7 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
@@ -92,7 +92,7 @@ public class SSTableOfflineRelevel
String columnfamily = args[args.length - 1];
Schema.instance.loadFromDisk(false);
- if (Schema.instance.getCFMetaData(keyspace, columnfamily) == null)
+ if (Schema.instance.getTableMetadataRef(keyspace, columnfamily) == null)
throw new IllegalArgumentException(String.format("Unknown keyspace/columnFamily %s.%s",
keyspace,
columnfamily));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
index 2e8ee0b..adfe7e0 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
@@ -18,9 +18,8 @@
*/
package org.apache.cassandra.tools;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -52,7 +51,7 @@ public class StandaloneSSTableUtil
Util.initDatabaseDescriptor();
Schema.instance.loadFromDisk(false);
- CFMetaData metadata = Schema.instance.getCFMetaData(options.keyspaceName, options.cfName);
+ TableMetadata metadata = Schema.instance.getTableMetadata(options.keyspaceName, options.cfName);
if (metadata == null)
throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
options.keyspaceName,
@@ -82,7 +81,7 @@ public class StandaloneSSTableUtil
}
}
- private static void listFiles(Options options, CFMetaData metadata, OutputHandler handler) throws IOException
+ private static void listFiles(Options options, TableMetadata metadata, OutputHandler handler) throws IOException
{
Directories directories = new Directories(metadata, ColumnFamilyStore.getInitialDirectories());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 54b340e..f7f48c8 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.cli.*;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
@@ -61,7 +61,7 @@ public class StandaloneScrubber
// load keyspace descriptions.
Schema.instance.loadFromDisk(false);
- if (Schema.instance.getKSMetaData(options.keyspaceName) == null)
+ if (Schema.instance.getKeyspaceMetadata(options.keyspaceName) == null)
throw new IllegalArgumentException(String.format("Unknown keyspace %s", options.keyspaceName));
// Do not load sstables since they might be broken
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index aaaa9db..c5be02e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -22,7 +22,7 @@ import java.io.File;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.cli.*;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index e55b3a8..ed25e42 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.*;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
@@ -55,7 +55,7 @@ public class StandaloneUpgrader
// load keyspace descriptions.
Schema.instance.loadFromDisk(false);
- if (Schema.instance.getCFMetaData(options.keyspace, options.cf) == null)
+ if (Schema.instance.getTableMetadataRef(options.keyspace, options.cf) == null)
throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
options.keyspace,
options.cf));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index ee55dd5..40dfbf7 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -18,7 +18,7 @@
*/
package org.apache.cassandra.tools;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
@@ -56,7 +56,7 @@ public class StandaloneVerifier
boolean hasFailed = false;
- if (Schema.instance.getCFMetaData(options.keyspaceName, options.cfName) == null)
+ if (Schema.instance.getTableMetadataRef(options.keyspaceName, options.cfName) == null)
throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
options.keyspaceName,
options.cfName));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
index c964b2f..4102846 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import io.airlift.command.Option;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 350601a..48f929f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -31,7 +31,7 @@ import java.util.Set;
import com.google.common.collect.Sets;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.tools.NodeProbe;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index ac8b4f7..20c992c 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -20,18 +20,23 @@ package org.apache.cassandra.tracing;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Tables;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
+import static java.lang.String.format;
+
public final class TraceKeyspace
{
private TraceKeyspace()
@@ -41,36 +46,41 @@ public final class TraceKeyspace
public static final String SESSIONS = "sessions";
public static final String EVENTS = "events";
- private static final CFMetaData Sessions =
- compile(SESSIONS,
- "tracing sessions",
- "CREATE TABLE %s ("
- + "session_id uuid,"
- + "command text,"
- + "client inet,"
- + "coordinator inet,"
- + "duration int,"
- + "parameters map<text, text>,"
- + "request text,"
- + "started_at timestamp,"
- + "PRIMARY KEY ((session_id)))");
-
- private static final CFMetaData Events =
- compile(EVENTS,
- "tracing events",
- "CREATE TABLE %s ("
- + "session_id uuid,"
- + "event_id timeuuid,"
- + "activity text,"
- + "source inet,"
- + "source_elapsed int,"
- + "thread text,"
- + "PRIMARY KEY ((session_id), event_id))");
-
- private static CFMetaData compile(String name, String description, String schema)
+ private static final TableMetadata Sessions =
+ parse(SESSIONS,
+ "tracing sessions",
+ "CREATE TABLE %s ("
+ + "session_id uuid,"
+ + "command text,"
+ + "client inet,"
+ + "coordinator inet,"
+ + "duration int,"
+ + "parameters map<text, text>,"
+ + "request text,"
+ + "started_at timestamp,"
+ + "PRIMARY KEY ((session_id)))");
+
+ private static final TableMetadata Events =
+ parse(EVENTS,
+ "tracing events",
+ "CREATE TABLE %s ("
+ + "session_id uuid,"
+ + "event_id timeuuid,"
+ + "activity text,"
+ + "source inet,"
+ + "source_elapsed int,"
+ + "thread text,"
+ + "PRIMARY KEY ((session_id), event_id))");
+
+ private static TableMetadata parse(String table, String description, String cql)
{
- return CFMetaData.compile(String.format(schema, name), SchemaConstants.TRACE_KEYSPACE_NAME)
- .comment(description);
+ return CreateTableStatement.parse(format(cql, table), SchemaConstants.TRACE_KEYSPACE_NAME)
+ .id(TableId.forSystemTable(SchemaConstants.TRACE_KEYSPACE_NAME, table))
+ .dcLocalReadRepairChance(0.0)
+ .gcGraceSeconds(0)
+ .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1))
+ .comment(description)
+ .build();
}
public static KeyspaceMetadata metadata()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 1eeecac..f38d83d 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -49,6 +49,8 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaChangeListener;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.transport.messages.EventMessage;
@@ -100,7 +102,7 @@ public class Server implements CassandraDaemon.Server
eventExecutorGroup = builder.eventExecutorGroup;
EventNotifier notifier = new EventNotifier(this);
StorageService.instance.register(notifier);
- MigrationManager.instance.register(notifier);
+ Schema.instance.registerListener(notifier);
}
public void stop()
@@ -448,7 +450,7 @@ public class Server implements CassandraDaemon.Server
}
}
- private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
+ private static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber
{
private final Server server;
@@ -584,12 +586,12 @@ public class Server implements CassandraDaemon.Server
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
}
- public void onCreateColumnFamily(String ksName, String cfName)
+ public void onCreateTable(String ksName, String cfName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
- public void onCreateUserType(String ksName, String typeName)
+ public void onCreateType(String ksName, String typeName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
@@ -606,28 +608,28 @@ public class Server implements CassandraDaemon.Server
ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
}
- public void onUpdateKeyspace(String ksName)
+ public void onAlterKeyspace(String ksName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
}
- public void onUpdateColumnFamily(String ksName, String cfName, boolean affectsStatements)
+ public void onAlterTable(String ksName, String cfName, boolean affectsStatements)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
- public void onUpdateUserType(String ksName, String typeName)
+ public void onAlterType(String ksName, String typeName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
- public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ public void onAlterFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
}
- public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ public void onAlterAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
@@ -638,12 +640,12 @@ public class Server implements CassandraDaemon.Server
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
}
- public void onDropColumnFamily(String ksName, String cfName)
+ public void onDropTable(String ksName, String cfName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
- public void onDropUserType(String ksName, String typeName)
+ public void onDropType(String ksName, String typeName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 703e69a..906b342 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.CassandraException;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TriggerMetadata;
import org.apache.cassandra.schema.Triggers;
import org.apache.cassandra.utils.FBUtilities;
@@ -86,7 +87,7 @@ public class TriggerExecutor
if (intermediate == null || intermediate.isEmpty())
return updates;
- return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().cfId, updates.partitionKey(), intermediate));
+ return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().id, updates.partitionKey(), intermediate));
}
/**
@@ -157,9 +158,9 @@ public class TriggerExecutor
return merged;
}
- private List<PartitionUpdate> validateForSinglePartition(UUID cfId,
- DecoratedKey key,
- Collection<Mutation> tmutations)
+ private List<PartitionUpdate> validateForSinglePartition(TableId tableId,
+ DecoratedKey key,
+ Collection<Mutation> tmutations)
throws InvalidRequestException
{
validate(tmutations);
@@ -169,7 +170,7 @@ public class TriggerExecutor
List<PartitionUpdate> updates = Lists.newArrayList(Iterables.getOnlyElement(tmutations).getPartitionUpdates());
if (updates.size() > 1)
throw new InvalidRequestException("The updates generated by triggers are not all for the same partition");
- validateSamePartition(cfId, key, Iterables.getOnlyElement(updates));
+ validateSamePartition(tableId, key, Iterables.getOnlyElement(updates));
return updates;
}
@@ -178,20 +179,20 @@ public class TriggerExecutor
{
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
- validateSamePartition(cfId, key, update);
+ validateSamePartition(tableId, key, update);
updates.add(update);
}
}
return updates;
}
- private void validateSamePartition(UUID cfId, DecoratedKey key, PartitionUpdate update)
+ private void validateSamePartition(TableId tableId, DecoratedKey key, PartitionUpdate update)
throws InvalidRequestException
{
if (!key.equals(update.partitionKey()))
throw new InvalidRequestException("Partition key of additional mutation does not match primary update key");
- if (!cfId.equals(update.metadata().cfId))
+ if (!tableId.equals(update.metadata().id))
throw new InvalidRequestException("table of additional mutation does not match primary update table");
}
@@ -211,7 +212,7 @@ public class TriggerExecutor
*/
private List<Mutation> executeInternal(PartitionUpdate update)
{
- Triggers triggers = update.metadata().getTriggers();
+ Triggers triggers = update.metadata().triggers;
if (triggers.isEmpty())
return null;
List<Mutation> tmutations = Lists.newLinkedList();
@@ -238,7 +239,7 @@ public class TriggerExecutor
}
catch (Exception ex)
{
- throw new RuntimeException(String.format("Exception while executing trigger on table with ID: %s", update.metadata().cfId), ex);
+ throw new RuntimeException(String.format("Exception while executing trigger on table with ID: %s", update.metadata().id), ex);
}
finally
{