You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/05 17:51:59 UTC
[4/6] Finer grained exception hierarchy, and adds error codes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a2faf94/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3863eed..6f461c9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
@@ -55,7 +56,7 @@ import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.metrics.ClientRequestMetrics;
import org.apache.cassandra.net.*;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.utils.*;
public class StorageProxy implements StorageProxyMBean
@@ -107,7 +108,7 @@ public class StorageProxy implements StorageProxyMBean
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException, UnavailableException
+ throws IOException, OverloadedException
{
assert mutation instanceof RowMutation;
sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
@@ -164,7 +165,8 @@ public class StorageProxy implements StorageProxyMBean
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
- public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimedOutException
+ public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level)
+ throws UnavailableException, OverloadedException, WriteTimeoutException
{
logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -195,7 +197,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- catch (TimedOutException ex)
+ catch (WriteTimeoutException ex)
{
writeMetrics.timeouts.mark();
ClientRequestMetrics.writeTimeouts.inc();
@@ -204,7 +206,7 @@ public class StorageProxy implements StorageProxyMBean
List<String> mstrings = new ArrayList<String>(mutations.size());
for (IMutation mutation : mutations)
mstrings.add(mutation.toString(true));
- logger.debug("Write timeout {} for one (or more) of: ", ex.toString(), mstrings);
+ logger.debug("Write timeout {} for one (or more) of: {}", ex.toString(), mstrings);
}
throw ex;
}
@@ -214,6 +216,11 @@ public class StorageProxy implements StorageProxyMBean
ClientRequestMetrics.writeUnavailables.inc();
throw e;
}
+ catch (OverloadedException e)
+ {
+ ClientRequestMetrics.writeUnavailables.inc();
+ throw e;
+ }
catch (IOException e)
{
assert mostRecentMutation != null;
@@ -241,7 +248,7 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level,
String localDataCenter,
WritePerformer performer)
- throws UnavailableException, IOException
+ throws UnavailableException, OverloadedException, IOException
{
String table = mutation.getTable();
AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
@@ -284,7 +291,7 @@ public class StorageProxy implements StorageProxyMBean
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException, UnavailableException
+ throws IOException, OverloadedException
{
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>(targets.size());
@@ -299,7 +306,7 @@ public class StorageProxy implements StorageProxyMBean
if (totalHintsInProgress.get() > maxHintsInProgress
&& (hintsInProgress.get(destination).get() > 0 && shouldHint(destination)))
{
- throw new UnavailableException();
+ throw new OverloadedException("Too many in flight hints: " + totalHintsInProgress.get());
}
if (FailureDetector.instance.isAlive(destination))
@@ -470,9 +477,9 @@ public class StorageProxy implements StorageProxyMBean
* quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
* the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
*/
- public static IWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, IOException
+ public static IWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException, IOException
{
- InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter);
+ InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter, cm.consistency());
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
@@ -507,12 +514,13 @@ public class StorageProxy implements StorageProxyMBean
* is unclear we want to mix those latencies with read latencies, so this
* may be a bit involved.
*/
- private static InetAddress findSuitableEndpoint(String table, ByteBuffer key, String localDataCenter) throws UnavailableException
+ private static InetAddress findSuitableEndpoint(String table, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
if (endpoints.isEmpty())
- throw new UnavailableException();
+ // TODO have a way to compute the consistency level
+ throw new UnavailableException(cl, cl.blockFor(table), 0);
List<InetAddress> localEndpoints = new ArrayList<InetAddress>();
for (InetAddress endpoint : endpoints)
@@ -532,18 +540,18 @@ public class StorageProxy implements StorageProxyMBean
}
}
-
-
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
- public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, IOException
+ public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter)
+ throws UnavailableException, IOException, OverloadedException
{
return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer);
}
// Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
// applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
- public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, IOException
+ public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
+ throws UnavailableException, IOException, OverloadedException
{
return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer);
}
@@ -573,7 +581,7 @@ public class StorageProxy implements StorageProxyMBean
// and we want to avoid blocking too much the MUTATION stage
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(MessagingService.Verb.READ)
{
- public void runMayThrow() throws IOException, TimeoutException, UnavailableException
+ public void runMayThrow() throws IOException, OverloadedException
{
// send mutation to other replica
sendToHintedEndpoints(cm.makeReplicationMutation(), targets, responseHandler, localDataCenter, consistency_level);
@@ -597,16 +605,16 @@ public class StorageProxy implements StorageProxyMBean
* a specific set of column names from a given column family.
*/
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level)
- throws IOException, UnavailableException, TimeoutException, InvalidRequestException
+ throws IOException, UnavailableException, IsBootstrappingException, ReadTimeoutException
{
if (StorageService.instance.isBootstrapMode() && !systemTableQuery(commands))
{
readMetrics.unavailables.mark();
ClientRequestMetrics.readUnavailables.inc();
- throw new UnavailableException();
+ throw new IsBootstrappingException();
}
long startTime = System.nanoTime();
- List<Row> rows;
+ List<Row> rows = null;
try
{
rows = fetchRows(commands, consistency_level);
@@ -617,7 +625,7 @@ public class StorageProxy implements StorageProxyMBean
ClientRequestMetrics.readUnavailables.inc();
throw e;
}
- catch (TimeoutException e)
+ catch (ReadTimeoutException e)
{
readMetrics.timeouts.mark();
ClientRequestMetrics.readTimeouts.inc();
@@ -641,7 +649,8 @@ public class StorageProxy implements StorageProxyMBean
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
- private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
+ private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level)
+ throws IOException, UnavailableException, ReadTimeoutException
{
List<Row> rows = new ArrayList<Row>(initialCommands.size());
List<ReadCommand> commandsToRetry = Collections.emptyList();
@@ -730,7 +739,7 @@ public class StorageProxy implements StorageProxyMBean
if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
}
- catch (TimeoutException ex)
+ catch (ReadTimeoutException ex)
{
if (logger.isDebugEnabled())
logger.debug("Read timeout: {}", ex.toString());
@@ -769,9 +778,17 @@ public class StorageProxy implements StorageProxyMBean
{
ReadCommand command = repairCommands.get(i);
RepairCallback handler = repairResponseHandlers.get(i);
- // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
- // behind on writes in case the out-of-sync row is read multiple times in quick succession
- FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+ try
+ {
+ // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
+ // behind on writes in case the out-of-sync row is read multiple times in quick succession
+ FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+ }
+ catch (TimeoutException e)
+ {
+ int blockFor = consistency_level.blockFor(command.getKeyspace());
+ throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
+ }
Row row;
try
@@ -865,7 +882,7 @@ public class StorageProxy implements StorageProxyMBean
}
public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
- throws IOException, UnavailableException, TimeoutException
+ throws IOException, UnavailableException, ReadTimeoutException
{
if (logger.isDebugEnabled())
logger.debug("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level);
@@ -927,7 +944,9 @@ public class StorageProxy implements StorageProxyMBean
{
if (logger.isDebugEnabled())
logger.debug("Range slice timeout: {}", ex.toString());
- throw ex;
+ // We actually got all response at that point
+ int blockFor = consistency_level.blockFor(command.keyspace);
+ throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true);
}
catch (DigestMismatchException e)
{
@@ -1208,7 +1227,8 @@ public class StorageProxy implements StorageProxyMBean
// Since the truncate operation is so aggressive and is typically only
// invoked by an admin, for simplicity we require that all nodes are up
// to perform the operation.
- throw new UnavailableException();
+ int liveMembers = Gossiper.instance.getLiveMembers().size();
+ throw new UnavailableException(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers);
}
Set<InetAddress> allEndpoints = Gossiper.instance.getLiveMembers();
@@ -1239,7 +1259,7 @@ public class StorageProxy implements StorageProxyMBean
public interface WritePerformer
{
- public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, UnavailableException;
+ public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, OverloadedException;
}
private static abstract class DroppableRunnable implements Runnable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a2faf94/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4f06f1c..cc94727 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -33,6 +33,7 @@ import javax.management.ObjectName;
import static com.google.common.base.Charsets.ISO_8859_1;
import com.google.common.collect.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -47,6 +48,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.SSTableLoader;
@@ -483,7 +485,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- private void joinTokenRing(int delay) throws IOException, org.apache.cassandra.config.ConfigurationException
+ private void joinTokenRing(int delay) throws IOException, ConfigurationException
{
logger.info("Starting up server gossip");
joined = true;
@@ -2883,7 +2885,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
return old;
}
- public void truncate(String keyspace, String columnFamily) throws UnavailableException, TimeoutException, IOException
+ public void truncate(String keyspace, String columnFamily)
+ throws org.apache.cassandra.exceptions.UnavailableException, TimeoutException, IOException
{
StorageProxy.truncateBlocking(keyspace, columnFamily);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a2faf94/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8ad047e..77e9843 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -26,9 +26,9 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnavailableException;
public interface StorageServiceMBean
@@ -385,7 +385,7 @@ public interface StorageServiceMBean
public boolean isNativeTransportRunning();
// allows a node that have been started without joining the ring to join it
- public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException;
+ public void joinRing() throws IOException, ConfigurationException;
public boolean isJoined();
public int getExceptionCount();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a2faf94/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 0164c32..5b0f64c 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -26,10 +26,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -45,7 +45,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
{
super(writeEndpoints, consistencyLevel);
- blockFor = determineBlockFor(table);
+ blockFor = consistencyLevel.blockFor(table);
responses = new AtomicInteger(blockFor);
}
@@ -77,25 +77,9 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
return blockFor - responses.get();
}
- protected int determineBlockFor(String table)
+ protected int blockFor()
{
- switch (consistencyLevel)
- {
- case ONE:
- return 1;
- case ANY:
- return 1;
- case TWO:
- return 2;
- case THREE:
- return 3;
- case QUORUM:
- return (Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
- case ALL:
- return Table.open(table).getReplicationStrategy().getReplicationFactor();
- default:
- throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel.toString());
- }
+ return blockFor;
}
public void assureSufficientLiveNodes() throws UnavailableException
@@ -106,7 +90,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
// Thus we include the local node (coordinator) as a valid replica if it is there already.
int effectiveEndpoints = writeEndpoints.contains(FBUtilities.getBroadcastAddress()) ? writeEndpoints.size() : writeEndpoints.size() + 1;
if (effectiveEndpoints < responses.get())
- throw new UnavailableException();
+ throw new UnavailableException(consistencyLevel, responses.get(), effectiveEndpoints);
return;
}
@@ -119,7 +103,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
}
if (liveNodes < responses.get())
{
- throw new UnavailableException();
+ throw new UnavailableException(consistencyLevel, responses.get(), liveNodes);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a2faf94/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5eedcd7..44c6038 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -46,6 +46,10 @@ import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.scheduler.IRequestScheduler;
@@ -101,13 +105,13 @@ public class CassandraServer implements Cassandra.Iface
return cState;
}
- protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)
- throws InvalidRequestException, UnavailableException, TimedOutException
+ protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level)
+ throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
{
// TODO - Support multiple column families per row, right now row only contains 1 column family
Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>();
- List<Row> rows;
+ List<Row> rows = null;
try
{
schedule(DatabaseDescriptor.getReadRpcTimeout());
@@ -120,10 +124,9 @@ public class CassandraServer implements Cassandra.Iface
release();
}
}
- catch (TimeoutException e)
+ catch (RequestExecutionException e)
{
- logger.debug("... timed out");
- throw new TimedOutException();
+ ThriftConversion.rethrow(e);
}
catch (IOException e)
{
@@ -265,8 +268,8 @@ public class CassandraServer implements Cassandra.Iface
return thriftSuperColumns;
}
- private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, ConsistencyLevel consistency_level)
- throws InvalidRequestException, UnavailableException, TimedOutException
+ private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level)
+ throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
{
Map<DecoratedKey, ColumnFamily> columnFamilies = readColumnFamily(commands, consistency_level);
Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<ByteBuffer, List<ColumnOrSuperColumn>>();
@@ -327,6 +330,10 @@ public class CassandraServer implements Cassandra.Iface
return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent,
predicate, consistency_level).get(key);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -359,6 +366,10 @@ public class CassandraServer implements Cassandra.Iface
state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
return multigetSliceInternal(state().getKeyspace(), keys, column_parent, predicate, consistency_level);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -366,12 +377,14 @@ public class CassandraServer implements Cassandra.Iface
}
private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
- throws InvalidRequestException, UnavailableException, TimedOutException
+ throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
{
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, predicate);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
if (predicate.column_names != null)
@@ -392,11 +405,11 @@ public class CassandraServer implements Cassandra.Iface
}
}
- return getSlice(commands, consistency_level);
+ return getSlice(commands, consistencyLevel);
}
private ColumnOrSuperColumn internal_get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
- throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
+ throws RequestValidationException, NotFoundException, UnavailableException, TimedOutException
{
ClientState cState = state();
cState.hasColumnFamilyAccess(column_path.column_family, Permission.READ);
@@ -404,14 +417,15 @@ public class CassandraServer implements Cassandra.Iface
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
ThriftValidation.validateColumnPath(metadata, column_path);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
QueryPath path = new QueryPath(column_path.column_family, column_path.column == null ? null : column_path.super_column);
List<ByteBuffer> nameAsList = Arrays.asList(column_path.column == null ? column_path.super_column : column_path.column);
ThriftValidation.validateKey(metadata, key);
ReadCommand command = new SliceByNamesReadCommand(keyspace, key, path, nameAsList);
- Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistency_level);
+ Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel);
ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
@@ -444,6 +458,10 @@ public class CassandraServer implements Cassandra.Iface
{
return internal_get(key, column_path, consistency_level);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -529,6 +547,10 @@ public class CassandraServer implements Cassandra.Iface
return totalCount;
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -579,6 +601,10 @@ public class CassandraServer implements Cassandra.Iface
}
return counts;
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -586,7 +612,7 @@ public class CassandraServer implements Cassandra.Iface
}
private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
- throws InvalidRequestException, UnavailableException, TimedOutException
+ throws RequestValidationException, UnavailableException, TimedOutException
{
ClientState cState = state();
cState.hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
@@ -597,7 +623,7 @@ public class CassandraServer implements Cassandra.Iface
// SuperColumn field is usually optional, but not when we're inserting
if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
{
- throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
+ throw new org.apache.cassandra.exceptions.InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
}
ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
ThriftValidation.validateColumnData(metadata, column, column_parent.super_column != null);
@@ -609,7 +635,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (MarshalException e)
{
- throw new InvalidRequestException(e.getMessage());
+ throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage());
}
doInsert(consistency_level, Arrays.asList(rm));
}
@@ -634,6 +660,10 @@ public class CassandraServer implements Cassandra.Iface
{
internal_insert(key, column_parent, column, consistency_level);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -641,7 +671,7 @@ public class CassandraServer implements Cassandra.Iface
}
private void internal_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
- throws InvalidRequestException, UnavailableException, TimedOutException
+ throws RequestValidationException, UnavailableException, TimedOutException
{
List<String> cfamsSeen = new ArrayList<String>();
List<IMutation> rowMutations = new ArrayList<IMutation>();
@@ -675,7 +705,7 @@ public class CassandraServer implements Cassandra.Iface
RowMutation rm;
if (metadata.getDefaultValidator().isCommutative())
{
- ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
+ ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
rmCounter = rmCounter == null ? new RowMutation(keyspace, key) : rmCounter;
rm = rmCounter;
}
@@ -702,7 +732,7 @@ public class CassandraServer implements Cassandra.Iface
if (rmStandard != null && !rmStandard.isEmpty())
rowMutations.add(rmStandard);
if (rmCounter != null && !rmCounter.isEmpty())
- rowMutations.add(new org.apache.cassandra.db.CounterMutation(rmCounter, consistency_level));
+ rowMutations.add(new org.apache.cassandra.db.CounterMutation(rmCounter, ThriftConversion.fromThrift(consistency_level)));
}
doInsert(consistency_level, rowMutations);
@@ -731,6 +761,10 @@ public class CassandraServer implements Cassandra.Iface
{
internal_batch_mutate(mutation_map, consistency_level);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -738,7 +772,7 @@ public class CassandraServer implements Cassandra.Iface
}
private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp)
- throws InvalidRequestException, UnavailableException, TimedOutException
+ throws RequestValidationException, UnavailableException, TimedOutException
{
ClientState cState = state();
cState.hasColumnFamilyAccess(column_path.column_family, Permission.WRITE);
@@ -747,13 +781,13 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateKey(metadata, key);
ThriftValidation.validateColumnPathOrParent(metadata, column_path);
if (isCommutativeOp)
- ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
+ ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
RowMutation rm = new RowMutation(cState.getKeyspace(), key);
rm.delete(new QueryPath(column_path), timestamp);
if (isCommutativeOp)
- doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
+ doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, ThriftConversion.fromThrift(consistency_level))));
else
doInsert(consistency_level, Arrays.asList(rm));
}
@@ -778,22 +812,32 @@ public class CassandraServer implements Cassandra.Iface
{
internal_remove(key, column_path, timestamp, consistency_level, false);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
}
}
- private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException
+ private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations)
+ throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
{
- ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level, RequestType.WRITE);
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForWrite(state().getKeyspace());
if (mutations.isEmpty())
return;
schedule(DatabaseDescriptor.getWriteRpcTimeout());
try
{
- StorageProxy.mutate(mutations, consistency_level);
+ StorageProxy.mutate(mutations, consistencyLevel);
+ }
+ catch (RequestExecutionException e)
+ {
+ ThriftConversion.rethrow(e);
}
finally
{
@@ -803,13 +847,20 @@ public class CassandraServer implements Cassandra.Iface
public KsDef describe_keyspace(String table) throws NotFoundException, InvalidRequestException
{
- state().hasKeyspaceSchemaAccess(Permission.READ);
+ try
+ {
+ state().hasKeyspaceSchemaAccess(Permission.READ);
- KSMetaData ksm = Schema.instance.getTableDefinition(table);
- if (ksm == null)
- throw new NotFoundException();
+ KSMetaData ksm = Schema.instance.getTableDefinition(table);
+ if (ksm == null)
+ throw new NotFoundException();
- return ksm.toThrift();
+ return ksm.toThrift();
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
}
public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
@@ -844,51 +895,59 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, predicate);
ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
List<Row> rows = null;
- try
+
+ IPartitioner p = StorageService.getPartitioner();
+ AbstractBounds<RowPosition> bounds;
+ if (range.start_key == null)
{
- IPartitioner p = StorageService.getPartitioner();
- AbstractBounds<RowPosition> bounds;
- if (range.start_key == null)
- {
- Token.TokenFactory tokenFactory = p.getTokenFactory();
- Token left = tokenFactory.fromString(range.start_token);
- Token right = tokenFactory.fromString(range.end_token);
- bounds = Range.makeRowRange(left, right, p);
- }
- else
- {
- bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(
- range.end_key, p));
- }
- schedule(DatabaseDescriptor.getRangeRpcTimeout());
- try
- {
- IFilter filter = ThriftValidation.asIFilter(predicate,
- metadata.getComparatorFor(column_parent.super_column));
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, filter, bounds,
- range.row_filter, range.count), consistency_level);
- }
- finally
- {
- release();
- }
- assert rows != null;
+ Token.TokenFactory tokenFactory = p.getTokenFactory();
+ Token left = tokenFactory.fromString(range.start_token);
+ Token right = tokenFactory.fromString(range.end_token);
+ bounds = Range.makeRowRange(left, right, p);
}
- catch (TimeoutException e)
+ else
{
- logger.debug("... timed out");
- throw new TimedOutException();
+ bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(
+ range.end_key, p));
}
- catch (IOException e)
+ schedule(DatabaseDescriptor.getRangeRpcTimeout());
+ try
{
- throw new RuntimeException(e);
+ IFilter filter = ThriftValidation.asIFilter(predicate,
+ metadata.getComparatorFor(column_parent.super_column));
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, filter, bounds,
+ range.row_filter, range.count), consistencyLevel);
}
+ finally
+ {
+ release();
+ }
+ assert rows != null;
return thriftifyKeySlices(rows, column_parent, predicate);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (ReadTimeoutException e)
+ {
+ logger.debug("... timed out");
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (org.apache.cassandra.exceptions.UnavailableException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -921,7 +980,9 @@ public class CassandraServer implements Cassandra.Iface
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
ThriftValidation.validateKeyRange(metadata, null, range);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(start_column,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1));
@@ -945,33 +1006,38 @@ public class CassandraServer implements Cassandra.Iface
}
List<Row> rows;
+ schedule(DatabaseDescriptor.getRangeRpcTimeout());
try
{
- schedule(DatabaseDescriptor.getRangeRpcTimeout());
- try
- {
- IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, null, filter,
- bounds, range.row_filter, range.count, true, true), consistency_level);
- }
- finally
- {
- release();
- }
- assert rows != null;
- }
- catch (TimeoutException e)
- {
- logger.debug("... timed out");
- throw new TimedOutException();
+ IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, null, filter,
+ bounds, range.row_filter, range.count, true, true), consistencyLevel);
}
- catch (IOException e)
+ finally
{
- throw new RuntimeException(e);
+ release();
}
+ assert rows != null;
return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (ReadTimeoutException e)
+ {
+ logger.debug("... timed out");
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (org.apache.cassandra.exceptions.UnavailableException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -991,7 +1057,8 @@ public class CassandraServer implements Cassandra.Iface
return keySlices;
}
- public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+ throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
if (startSessionIfRequested())
{
@@ -1009,7 +1076,6 @@ public class CassandraServer implements Cassandra.Iface
try
{
-
ClientState cState = state();
cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
String keyspace = cState.getKeyspace();
@@ -1017,7 +1083,8 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, column_predicate);
ThriftValidation.validateIndexClauses(metadata, index_clause);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
IPartitioner p = StorageService.getPartitioner();
AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(RowPosition.forKey(index_clause.start_key, p),
@@ -1033,23 +1100,25 @@ public class CassandraServer implements Cassandra.Iface
index_clause.expressions,
index_clause.count);
- List<Row> rows;
- try
- {
- rows = StorageProxy.getRangeSlice(command, consistency_level);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- catch (TimeoutException e)
- {
- logger.debug("... timed out");
- throw new TimedOutException();
- }
-
+ List<Row> rows = StorageProxy.getRangeSlice(command, consistencyLevel);
return thriftifyKeySlices(rows, column_parent, column_predicate);
-
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (ReadTimeoutException e)
+ {
+ logger.debug("... timed out");
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (org.apache.cassandra.exceptions.UnavailableException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
finally
{
@@ -1059,22 +1128,29 @@ public class CassandraServer implements Cassandra.Iface
public List<KsDef> describe_keyspaces() throws TException, InvalidRequestException
{
- state().hasKeyspaceSchemaAccess(Permission.READ);
-
- Set<String> keyspaces = Schema.instance.getTables();
- List<KsDef> ksset = new ArrayList<KsDef>(keyspaces.size());
- for (String ks : keyspaces)
+ try
{
- try
- {
- ksset.add(describe_keyspace(ks));
- }
- catch (NotFoundException nfe)
+ state().hasKeyspaceSchemaAccess(Permission.READ);
+
+ Set<String> keyspaces = Schema.instance.getTables();
+ List<KsDef> ksset = new ArrayList<KsDef>(keyspaces.size());
+ for (String ks : keyspaces)
{
- logger.info("Failed to find metadata for keyspace '" + ks + "'. Continuing... ");
+ try
+ {
+ ksset.add(describe_keyspace(ks));
+ }
+ catch (NotFoundException nfe)
+ {
+ logger.info("Failed to find metadata for keyspace '" + ks + "'. Continuing... ");
+ }
}
+ return ksset;
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
}
- return ksset;
}
public String describe_cluster_name() throws TException
@@ -1087,9 +1163,16 @@ public class CassandraServer implements Cassandra.Iface
return Constants.VERSION;
}
- public List<TokenRange> describe_ring(String keyspace)throws InvalidRequestException
+ public List<TokenRange> describe_ring(String keyspace) throws InvalidRequestException
{
- return StorageService.instance.describeRing(keyspace);
+ try
+ {
+ return StorageService.instance.describeRing(keyspace);
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
}
public Map<String, String> describe_token_map() throws InvalidRequestException
@@ -1112,15 +1195,22 @@ public class CassandraServer implements Cassandra.Iface
public List<String> describe_splits(String cfName, String start_token, String end_token, int keys_per_split)
throws TException, InvalidRequestException
{
- // TODO: add keyspace authorization call post CASSANDRA-1425
- Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
- List<Token> tokens = StorageService.instance.getSplits(state().getKeyspace(), cfName, new Range<Token>(tf.fromString(start_token), tf.fromString(end_token)), keys_per_split);
- List<String> splits = new ArrayList<String>(tokens.size());
- for (Token token : tokens)
+ try
+ {
+ // TODO: add keyspace authorization call post CASSANDRA-1425
+ Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory();
+ List<Token> tokens = StorageService.instance.getSplits(state().getKeyspace(), cfName, new Range<Token>(tf.fromString(start_token), tf.fromString(end_token)), keys_per_split);
+ List<String> splits = new ArrayList<String>(tokens.size());
+ for (Token token : tokens)
+ {
+ splits.add(tf.toString(token));
+ }
+ return splits;
+ }
+ catch (RequestValidationException e)
{
- splits.add(tf.toString(token));
+ throw ThriftConversion.toThrift(e);
}
- return splits;
}
public void login(AuthenticationRequest auth_request) throws AuthenticationException, AuthorizationException, TException
@@ -1155,10 +1245,10 @@ public class CassandraServer implements Cassandra.Iface
throws InvalidRequestException, SchemaDisagreementException, TException
{
logger.debug("add_column_family");
- state().hasColumnFamilySchemaAccess(Permission.WRITE);
try
{
+ state().hasColumnFamilySchemaAccess(Permission.WRITE);
cf_def.unsetId(); // explicitly ignore any id set by client (Hector likes to set zero)
CFMetaData cfm = CFMetaData.fromThrift(cf_def);
if (cfm.getBloomFilterFpChance() == null)
@@ -1167,11 +1257,9 @@ public class CassandraServer implements Cassandra.Iface
MigrationManager.announceNewColumnFamily(cfm);
return Schema.instance.getVersion().toString();
}
- catch (ConfigurationException e)
+ catch (RequestValidationException e)
{
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ throw ThriftConversion.toThrift(e);
}
}
@@ -1181,18 +1269,16 @@ public class CassandraServer implements Cassandra.Iface
logger.debug("drop_column_family");
ClientState cState = state();
- cState.hasColumnFamilySchemaAccess(Permission.WRITE);
try
{
+ cState.hasColumnFamilySchemaAccess(Permission.WRITE);
MigrationManager.announceColumnFamilyDrop(cState.getKeyspace(), column_family);
return Schema.instance.getVersion().toString();
}
- catch (ConfigurationException e)
+ catch (RequestValidationException e)
{
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ throw ThriftConversion.toThrift(e);
}
}
@@ -1200,21 +1286,21 @@ public class CassandraServer implements Cassandra.Iface
throws InvalidRequestException, SchemaDisagreementException, TException
{
logger.debug("add_keyspace");
- ThriftValidation.validateKeyspaceNotSystem(ks_def.name);
- state().hasKeyspaceSchemaAccess(Permission.WRITE);
- ThriftValidation.validateKeyspaceNotYetExisting(ks_def.name);
-
- // generate a meaningful error if the user setup keyspace and/or column definition incorrectly
- for (CfDef cf : ks_def.cf_defs)
+ try
{
- if (!cf.getKeyspace().equals(ks_def.getName()))
+ ThriftValidation.validateKeyspaceNotSystem(ks_def.name);
+ state().hasKeyspaceSchemaAccess(Permission.WRITE);
+ ThriftValidation.validateKeyspaceNotYetExisting(ks_def.name);
+
+ // generate a meaningful error if the user setup keyspace and/or column definition incorrectly
+ for (CfDef cf : ks_def.cf_defs)
{
- throw new InvalidRequestException("CfDef (" + cf.getName() +") had a keyspace definition that did not match KsDef");
+ if (!cf.getKeyspace().equals(ks_def.getName()))
+ {
+ throw new InvalidRequestException("CfDef (" + cf.getName() +") had a keyspace definition that did not match KsDef");
+ }
}
- }
- try
- {
Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(ks_def.cf_defs.size());
for (CfDef cf_def : ks_def.cf_defs)
{
@@ -1226,11 +1312,9 @@ public class CassandraServer implements Cassandra.Iface
MigrationManager.announceNewKeyspace(KSMetaData.fromThrift(ks_def, cfDefs.toArray(new CFMetaData[cfDefs.size()])));
return Schema.instance.getVersion().toString();
}
- catch (ConfigurationException e)
+ catch (RequestValidationException e)
{
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ throw ThriftConversion.toThrift(e);
}
}
@@ -1238,19 +1322,17 @@ public class CassandraServer implements Cassandra.Iface
throws InvalidRequestException, SchemaDisagreementException, TException
{
logger.debug("drop_keyspace");
- ThriftValidation.validateKeyspaceNotSystem(keyspace);
- state().hasKeyspaceSchemaAccess(Permission.WRITE);
-
try
{
+ ThriftValidation.validateKeyspaceNotSystem(keyspace);
+ state().hasKeyspaceSchemaAccess(Permission.WRITE);
+
MigrationManager.announceKeyspaceDrop(keyspace);
return Schema.instance.getVersion().toString();
}
- catch (ConfigurationException e)
+ catch (RequestValidationException e)
{
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ throw ThriftConversion.toThrift(e);
}
}
@@ -1261,22 +1343,20 @@ public class CassandraServer implements Cassandra.Iface
throws InvalidRequestException, SchemaDisagreementException, TException
{
logger.debug("update_keyspace");
- ThriftValidation.validateKeyspaceNotSystem(ks_def.name);
- state().hasKeyspaceSchemaAccess(Permission.WRITE);
- ThriftValidation.validateTable(ks_def.name);
- if (ks_def.getCf_defs() != null && ks_def.getCf_defs().size() > 0)
- throw new InvalidRequestException("Keyspace update must not contain any column family definitions.");
-
try
{
+ ThriftValidation.validateKeyspaceNotSystem(ks_def.name);
+ state().hasKeyspaceSchemaAccess(Permission.WRITE);
+ ThriftValidation.validateTable(ks_def.name);
+ if (ks_def.getCf_defs() != null && ks_def.getCf_defs().size() > 0)
+ throw new InvalidRequestException("Keyspace update must not contain any column family definitions.");
+
MigrationManager.announceKeyspaceUpdate(KSMetaData.fromThrift(ks_def));
return Schema.instance.getVersion().toString();
}
- catch (ConfigurationException e)
+ catch (RequestValidationException e)
{
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ throw ThriftConversion.toThrift(e);
}
}
@@ -1284,45 +1364,44 @@ public class CassandraServer implements Cassandra.Iface
throws InvalidRequestException, SchemaDisagreementException, TException
{
logger.debug("update_column_family");
- state().hasColumnFamilySchemaAccess(Permission.WRITE);
- if (cf_def.keyspace == null || cf_def.name == null)
- throw new InvalidRequestException("Keyspace and CF name must be set.");
- CFMetaData oldCfm = Schema.instance.getCFMetaData(cf_def.keyspace, cf_def.name);
- if (oldCfm == null)
- throw new InvalidRequestException("Could not find column family definition to modify.");
-
try
{
+ state().hasColumnFamilySchemaAccess(Permission.WRITE);
+ if (cf_def.keyspace == null || cf_def.name == null)
+ throw new InvalidRequestException("Keyspace and CF name must be set.");
+ CFMetaData oldCfm = Schema.instance.getCFMetaData(cf_def.keyspace, cf_def.name);
+ if (oldCfm == null)
+ throw new InvalidRequestException("Could not find column family definition to modify.");
+
CFMetaData.applyImplicitDefaults(cf_def);
CFMetaData cfm = CFMetaData.fromThrift(cf_def);
cfm.addDefaultIndexNames();
MigrationManager.announceColumnFamilyUpdate(cfm);
return Schema.instance.getVersion().toString();
}
- catch (ConfigurationException e)
+ catch (RequestValidationException e)
{
- InvalidRequestException ex = new InvalidRequestException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ throw ThriftConversion.toThrift(e);
}
}
public void truncate(String cfname) throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
ClientState cState = state();
- cState.hasColumnFamilyAccess(cfname, Permission.WRITE);
-
- if (startSessionIfRequested())
- {
- Tracing.instance().begin("truncate", ImmutableMap.of("cf", cfname, "ks", cState.getKeyspace()));
- }
- else
- {
- logger.debug("truncating {}.{}", cState.getKeyspace(), cfname);
- }
try
{
+ cState.hasColumnFamilyAccess(cfname, Permission.WRITE);
+
+ if (startSessionIfRequested())
+ {
+ Tracing.instance().begin("truncate", ImmutableMap.of("cf", cfname, "ks", cState.getKeyspace()));
+ }
+ else
+ {
+ logger.debug("truncating {}.{}", cState.getKeyspace(), cfname);
+ }
+
schedule(DatabaseDescriptor.getTruncateRpcTimeout());
try
{
@@ -1333,6 +1412,14 @@ public class CassandraServer implements Cassandra.Iface
release();
}
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
+ catch (org.apache.cassandra.exceptions.UnavailableException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
catch (TimeoutException e)
{
logger.debug("... timed out");
@@ -1350,9 +1437,15 @@ public class CassandraServer implements Cassandra.Iface
public void set_keyspace(String keyspace) throws InvalidRequestException, TException
{
- ThriftValidation.validateTable(keyspace);
-
- state().setKeyspace(keyspace);
+ try
+ {
+ ThriftValidation.validateTable(keyspace);
+ state().setKeyspace(keyspace);
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
}
public Map<String, List<String>> describe_schema_versions() throws TException, InvalidRequestException
@@ -1387,27 +1480,28 @@ public class CassandraServer implements Cassandra.Iface
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
ThriftValidation.validateKey(metadata, key);
- ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
+ ThriftConversion.fromThrift(consistency_level).validateCounterForWrite(metadata);
ThriftValidation.validateColumnParent(metadata, column_parent);
// SuperColumn field is usually optional, but not when we're adding
if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
- {
- throw new InvalidRequestException("missing mandatory super column name for super CF "
- + column_parent.column_family);
- }
+ throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
+
ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
RowMutation rm = new RowMutation(keyspace, key);
try
{
- rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name),
- column.value);
+ rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value);
}
catch (MarshalException e)
{
throw new InvalidRequestException(e.getMessage());
}
- doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
+ doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, ThriftConversion.fromThrift(consistency_level))));
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
}
finally
{
@@ -1416,7 +1510,7 @@ public class CassandraServer implements Cassandra.Iface
}
public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)
- throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
if (startSessionIfRequested())
{
@@ -1434,6 +1528,10 @@ public class CassandraServer implements Cassandra.Iface
{
internal_remove(key, path, System.currentTimeMillis(), consistency_level, true);
}
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -1520,6 +1618,15 @@ public class CassandraServer implements Cassandra.Iface
else
return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
}
+ catch (RequestExecutionException e)
+ {
+ ThriftConversion.rethrow(e);
+ return null;
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -1532,13 +1639,20 @@ public class CassandraServer implements Cassandra.Iface
if (logger.isDebugEnabled())
logger.debug("prepare_cql_query");
- String queryString = uncompress(query,compression);
+ try
+ {
+ String queryString = uncompress(query,compression);
- ClientState cState = state();
- if (cState.getCQLVersion().major == 2)
- return QueryProcessor.prepare(queryString, cState);
- else
- return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState).toThriftPreparedResult();
+ ClientState cState = state();
+ if (cState.getCQLVersion().major == 2)
+ return QueryProcessor.prepare(queryString, cState);
+ else
+ return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState).toThriftPreparedResult();
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
}
public CqlResult execute_prepared_cql_query(int itemId, List<ByteBuffer> bindVariables)
@@ -1576,10 +1690,18 @@ public class CassandraServer implements Cassandra.Iface
logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
statement.getBoundsTerms());
- return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables)
- .toThriftResult();
+ return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables).toThriftResult();
}
}
+ catch (RequestExecutionException e)
+ {
+ ThriftConversion.rethrow(e);
+ return null;
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
finally
{
Tracing.instance().stopSession();
@@ -1590,7 +1712,14 @@ public class CassandraServer implements Cassandra.Iface
{
logger.debug("set_cql_version: " + version);
- state().setCQLVersion(version);
+ try
+ {
+ state().setCQLVersion(version);
+ }
+ catch (RequestValidationException e)
+ {
+ throw ThriftConversion.toThrift(e);
+ }
}
public ByteBuffer trace_next_query() throws TException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a2faf94/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
new file mode 100644
index 0000000..ac823e9
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+
+/**
+ * Static utility methods to convert internal structure to and from thrift ones.
+ */
+public class ThriftConversion
+{
+ public static ConsistencyLevel toThrift(org.apache.cassandra.db.ConsistencyLevel cl)
+ {
+ switch (cl)
+ {
+ case ANY: return ConsistencyLevel.ANY;
+ case ONE: return ConsistencyLevel.ONE;
+ case TWO: return ConsistencyLevel.TWO;
+ case THREE: return ConsistencyLevel.THREE;
+ case QUORUM: return ConsistencyLevel.QUORUM;
+ case ALL: return ConsistencyLevel.ALL;
+ case LOCAL_QUORUM: return ConsistencyLevel.LOCAL_QUORUM;
+ case EACH_QUORUM: return ConsistencyLevel.EACH_QUORUM;
+ }
+ throw new AssertionError();
+ }
+
+ public static org.apache.cassandra.db.ConsistencyLevel fromThrift(ConsistencyLevel cl)
+ {
+ switch (cl)
+ {
+ case ANY: return org.apache.cassandra.db.ConsistencyLevel.ANY;
+ case ONE: return org.apache.cassandra.db.ConsistencyLevel.ONE;
+ case TWO: return org.apache.cassandra.db.ConsistencyLevel.TWO;
+ case THREE: return org.apache.cassandra.db.ConsistencyLevel.THREE;
+ case QUORUM: return org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+ case ALL: return org.apache.cassandra.db.ConsistencyLevel.ALL;
+ case LOCAL_QUORUM: return org.apache.cassandra.db.ConsistencyLevel.LOCAL_QUORUM;
+ case EACH_QUORUM: return org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
+ }
+ throw new AssertionError();
+ }
+
+ public static void rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
+ {
+ if (e instanceof RequestTimeoutException)
+ throw toThrift((RequestTimeoutException)e);
+ else
+ throw new UnavailableException();
+ }
+
+ public static InvalidRequestException toThrift(RequestValidationException e)
+ {
+ return new InvalidRequestException(e.getMessage());
+ }
+
+ public static InvalidRequestException toThrift(org.apache.cassandra.exceptions.InvalidRequestException e)
+ {
+ return new InvalidRequestException(e.getMessage());
+ }
+
+ public static UnavailableException toThrift(org.apache.cassandra.exceptions.UnavailableException e)
+ {
+ return new UnavailableException();
+ }
+
+ public static TimedOutException toThrift(RequestTimeoutException e)
+ {
+ TimedOutException toe = new TimedOutException();
+ if (e instanceof WriteTimeoutException)
+ toe.setAcknowledged_by(((WriteTimeoutException)e).received);
+ return toe;
+ }
+}