You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/22 20:36:19 UTC
svn commit: r1084315 - in /cassandra/trunk: ./ conf/ contrib/ contrib/pig/
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/c...
Author: jbellis
Date: Tue Mar 22 19:36:18 2011
New Revision: 1084315
URL: http://svn.apache.org/viewvc?rev=1084315&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/schema-sample.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/pig/README.txt
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g
cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java
cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
cassandra/trunk/test/system/__init__.py
cassandra/trunk/test/system/test_thrift_server.py
cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7:1026516-1082796
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7:1026516-1084291
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Mar 22 19:36:18 2011
@@ -22,6 +22,12 @@
* queue secondary indexes for flush before the parent (CASSANDRA-2330)
* shut down server for OOM on a Thrift thread (CASSANDRA-2269)
* reduce contention on Table.flusherLock (CASSANDRA-1954)
+ * fix comparator used for non-indexed secondary expressions in index scan
+ (CASSANDRA-2347)
+ * ensure size calculation and write phase of large-row compaction use
+ the same threshold for TTL expiration (CASSANDRA-2349)
+ * fix race when iterating CFs during add/drop (CASSANDRA-2350)
+ * add ConsistencyLevel command to CLI (CASSANDRA-2354)
0.7.4
Modified: cassandra/trunk/conf/schema-sample.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/schema-sample.txt?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/conf/schema-sample.txt (original)
+++ cassandra/trunk/conf/schema-sample.txt Tue Mar 22 19:36:18 2011
@@ -1,7 +1,7 @@
/*This file contains an example Keyspace that can be created using the
cassandra-cli command line interface as follows.
-bin/cassandra-cli -host localhost --file conf/Keyspace1.txt
+bin/cassandra-cli -host localhost --file conf/schema-sample.txt
The cassandra-cli includes online help that explains the statements below. You can
accessed the help without connecting to a running cassandra instance by starting the
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1082796
+/cassandra/branches/cassandra-0.7/contrib:1026516-1084291
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/pig/README.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/README.txt?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/README.txt (original)
+++ cassandra/trunk/contrib/pig/README.txt Tue Mar 22 19:36:18 2011
@@ -47,6 +47,11 @@ grunt> orderednames = ORDER namecounts B
grunt> topnames = LIMIT orderednames 50;
grunt> dump topnames;
+Slices on columns can also be specified:
+grunt> rows = LOAD 'cassandra://Keyspace1/Standard1&slice_start=C2&slice_end=C4&i&limit=1&reversed=true' USING CassandraStorage();
+
+Binary values for slice_start and slice_end can be escaped such as '\u0255'
+
Outputting to Cassandra requires the same format from input, so the simplest example is:
grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage();
Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Tue Mar 22 19:36:18 2011
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,10 +61,16 @@ public class CassandraStorage extends Lo
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+ private ByteBuffer slice_start = BOUND;
+ private ByteBuffer slice_end = BOUND;
+ private boolean slice_reverse = false;
+ private String keyspace;
+ private String column_family;
+
private Configuration conf;
private RecordReader reader;
private RecordWriter writer;
- private final int limit;
+ private int limit;
public CassandraStorage()
{
@@ -149,7 +156,7 @@ public class CassandraStorage extends Lo
this.reader = reader;
}
- private String[] parseLocation(String location) throws IOException
+ private void setLocationFromUri(String location) throws IOException
{
// parse uri into keyspace and columnfamily
String names[];
@@ -157,14 +164,30 @@ public class CassandraStorage extends Lo
{
if (!location.startsWith("cassandra://"))
throw new Exception("Bad scheme.");
- String[] parts = location.split("/+");
- names = new String[]{ parts[1], parts[2] };
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1)
+ {
+ for (String param : urlParts[1].split("&"))
+ {
+ String[] pair = param.split("=");
+ if (pair[0].equals("slice_start"))
+ slice_start = ByteBufferUtil.bytes(pair[1]);
+ else if (pair[0].equals("slice_end"))
+ slice_end = ByteBufferUtil.bytes(pair[1]);
+ else if (pair[0].equals("reversed"))
+ slice_reverse = Boolean.parseBoolean(pair[1]);
+ else if (pair[0].equals("limit"))
+ limit = Integer.parseInt(pair[1]);
+ }
+ }
+ String[] parts = urlParts[0].split("/+");
+ keyspace = parts[1];
+ column_family = parts[2];
}
catch (Exception e)
{
- throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage());
+ throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
}
- return names;
}
private void setConnectionInformation() throws IOException
@@ -186,12 +209,15 @@ public class CassandraStorage extends Lo
@Override
public void setLocation(String location, Job job) throws IOException
{
- SliceRange range = new SliceRange(BOUND, BOUND, false, limit);
- SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
conf = job.getConfiguration();
- ConfigHelper.setInputSlicePredicate(conf, predicate);
- String[] names = parseLocation(location);
- ConfigHelper.setInputColumnFamily(conf, names[0], names[1]);
+ setLocationFromUri(location);
+ if (ConfigHelper.getRawInputSlicePredicate(conf) == null)
+ {
+ SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
+ SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+ ConfigHelper.setInputSlicePredicate(conf, predicate);
+ }
+ ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
}
@@ -214,8 +240,8 @@ public class CassandraStorage extends Lo
public void setStoreLocation(String location, Job job) throws IOException
{
conf = job.getConfiguration();
- String[] names = parseLocation(location);
- ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]);
+ setLocationFromUri(location);
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
}
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1084291
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1084291
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1084291
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1084291
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1084291
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java Tue Mar 22 19:36:18 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.cache;
*/
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -139,4 +140,9 @@ public class InstrumentedCache<K, V>
{
return map.keySet();
}
+
+ public Set<Map.Entry<K, V>> getEntrySet()
+ {
+ return map.entrySet();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g Tue Mar 22 19:36:18 2011
@@ -58,6 +58,7 @@ tokens {
NODE_LIST;
NODE_TRUNCATE;
NODE_ASSUME;
+ NODE_CONSISTENCY_LEVEL;
// Internal Nodes.
NODE_COLUMN_ACCESS;
@@ -155,6 +156,7 @@ statement
| listStatement
| truncateStatement
| assumeStatement
+ | consistencyLevelStatement
| -> ^(NODE_NO_OP)
;
@@ -212,6 +214,8 @@ helpStatement
-> ^(NODE_HELP NODE_TRUNCATE)
| HELP ASSUME
-> ^(NODE_HELP NODE_ASSUME)
+ | HELP CONSISTENCYLEVEL
+ -> ^(NODE_HELP NODE_CONSISTENCY_LEVEL)
| HELP
-> ^(NODE_HELP)
| '?'
@@ -279,6 +283,11 @@ assumeStatement
-> ^(NODE_ASSUME columnFamily $assumptionElement $defaultType)
;
+consistencyLevelStatement
+ : CONSISTENCYLEVEL 'AS' defaultType=Identifier
+ -> ^(NODE_CONSISTENCY_LEVEL $defaultType)
+ ;
+
showClusterName
: SHOW 'CLUSTER NAME'
-> ^(NODE_SHOW_CLUSTER_NAME)
@@ -533,6 +542,7 @@ LIMIT: 'LIMIT';
TRUNCATE: 'TRUNCATE';
ASSUME: 'ASSUME';
TTL: 'TTL';
+CONSISTENCYLEVEL: 'CONSISTENCYLEVEL';
IP_ADDRESS
: IntegerLiteral '.' IntegerLiteral '.' IntegerLiteral '.' IntegerLiteral
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Mar 22 19:36:18 2011
@@ -106,8 +106,8 @@ public class CliClient extends CliUserHe
private String username = null;
private Map<String, KsDef> keyspacesMap = new HashMap<String, KsDef>();
private Map<String, AbstractType> cfKeysComparators;
-
-
+ private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+
public CliClient(CliSessionState cliSessionState, Cassandra.Client thriftClient)
{
this.sessionState = cliSessionState;
@@ -193,6 +193,9 @@ public class CliClient extends CliUserHe
case CliParser.NODE_ASSUME:
executeAssumeStatement(tree);
break;
+ case CliParser.NODE_CONSISTENCY_LEVEL:
+ executeConsistencyLevelStatement(tree);
+ break;
case CliParser.NODE_NO_OP:
// comment lines come here; they are treated as no ops.
break;
@@ -256,7 +259,7 @@ public class CliClient extends CliUserHe
SliceRange range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
SlicePredicate predicate = new SlicePredicate().setColumn_names(null).setSlice_range(range);
- int count = thriftClient.get_count(ByteBuffer.wrap(key.getBytes(Charsets.UTF_8)), colParent, predicate, ConsistencyLevel.ONE);
+ int count = thriftClient.get_count(ByteBuffer.wrap(key.getBytes(Charsets.UTF_8)), colParent, predicate, consistencyLevel);
sessionState.out.printf("%d columns%n", count);
}
@@ -306,7 +309,7 @@ public class CliClient extends CliUserHe
path.setColumn(columnName);
thriftClient.remove(ByteBuffer.wrap(key.getBytes(Charsets.UTF_8)), path,
- FBUtilities.timestampMicros(), ConsistencyLevel.ONE);
+ FBUtilities.timestampMicros(), consistencyLevel);
sessionState.out.println(String.format("%s removed.", (columnSpecCnt == 0) ? "row" : "column"));
}
@@ -319,7 +322,7 @@ public class CliClient extends CliUserHe
parent.setSuper_column(superColumnName);
SliceRange range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000000);
- List<ColumnOrSuperColumn> columns = thriftClient.get_slice(key, parent, new SlicePredicate().setColumn_names(null).setSlice_range(range), ConsistencyLevel.ONE);
+ List<ColumnOrSuperColumn> columns = thriftClient.get_slice(key, parent, new SlicePredicate().setColumn_names(null).setSlice_range(range), consistencyLevel);
AbstractType validator;
CfDef cfDef = getCfDef(columnFamily);
@@ -445,7 +448,7 @@ public class CliClient extends CliUserHe
Column column;
try
{
- column = thriftClient.get(key, path, ConsistencyLevel.ONE).column;
+ column = thriftClient.get(key, path, consistencyLevel).column;
}
catch (NotFoundException e)
{
@@ -571,7 +574,7 @@ public class CliClient extends CliUserHe
try
{
ColumnParent parent = new ColumnParent(columnFamily);
- slices = thriftClient.get_indexed_slices(parent, clause, predicate, ConsistencyLevel.ONE);
+ slices = thriftClient.get_indexed_slices(parent, clause, predicate, consistencyLevel);
printSliceList(columnFamilyDef, slices);
}
catch (InvalidRequestException e)
@@ -668,7 +671,7 @@ public class CliClient extends CliUserHe
}
// do the insert
- thriftClient.insert(getKeyAsBytes(columnFamily, keyTree), parent, columnToInsert, ConsistencyLevel.ONE);
+ thriftClient.insert(getKeyAsBytes(columnFamily, keyTree), parent, columnToInsert, consistencyLevel);
sessionState.out.println("Value inserted.");
}
@@ -1061,7 +1064,7 @@ public class CliClient extends CliUserHe
range.setStart_key(startKey).setEnd_key(endKey);
ColumnParent columnParent = new ColumnParent(columnFamily);
- List<KeySlice> keySlices = thriftClient.get_range_slices(columnParent, predicate, range, ConsistencyLevel.ONE);
+ List<KeySlice> keySlices = thriftClient.get_range_slices(columnParent, predicate, range, consistencyLevel);
printSliceList(columnFamilyDef, keySlices);
}
@@ -1090,6 +1093,32 @@ public class CliClient extends CliUserHe
}
/**
+ * Command: CONSISTENCYLEVEL AS (ONE | QUORUM ...)
+ * Tree: ^(NODE_CONSISTENCY_LEVEL AS (ONE | QUORUM ...))
+ * @param statement - tree representing current statement
+ */
+ private void executeConsistencyLevelStatement(Tree statement)
+ {
+ if (!CliMain.isConnected())
+ return;
+
+ String userSuppliedLevel = statement.getChild(0).getText().toUpperCase();
+
+ try
+ {
+ consistencyLevel = ConsistencyLevel.valueOf(userSuppliedLevel);
+ }
+ catch (IllegalArgumentException e)
+ {
+ String elements = "ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, ANY";
+ sessionState.out.println(String.format("'%s' is invalid. Available: %s", userSuppliedLevel, elements));
+ return;
+ }
+
+ sessionState.out.println(String.format("Consistency level is set to '%s'.", consistencyLevel));
+ }
+
+ /**
* Command: ASSUME <columnFamily> (VALIDATOR | COMPARATOR | KEYS | SUB_COMPARATOR) AS <type>
* Tree: ^(NODE_ASSUME <columnFamily> (VALIDATOR | COMPARATOR | KEYS | SUB_COMPARATOR) <type>))
* @param statement - tree representing current statement
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java Tue Mar 22 19:36:18 2011
@@ -36,6 +36,7 @@ public class CliCompleter extends Simple
"drop column family",
"rename keyspace",
"rename column family",
+ "consistencylevel",
"help connect",
"help describe keyspace",
@@ -56,7 +57,8 @@ public class CliCompleter extends Simple
"help del",
"help count",
"help list",
- "help truncate"
+ "help truncate",
+ "help consistencylevel"
};
private static String[] keyspaceCommands = {
"get",
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java Tue Mar 22 19:36:18 2011
@@ -327,7 +327,11 @@ public class CliUserHelp {
state.out.println("example:");
state.out.println("assume Users comparator as lexicaluuid;");
break;
-
+ case CliParser.NODE_CONSISTENCY_LEVEL:
+ state.out.println("consistencylevel as <level>");
+ state.out.println("example:");
+ state.out.println("consistencylevel as QUORUM");
+ break;
default:
state.out.println("?");
break;
@@ -374,6 +378,8 @@ public class CliUserHelp {
state.out.println("truncate <column_family>; Truncate specified column family.");
state.out.println("assume <column_family> <attribute> as <type>;");
state.out.println(" Assume a given column family attributes to match a specified type.");
+ state.out.println("consistencylevel as <level>;");
+ state.out.println(" Change the consistency level for set,get, and list operations.");
state.out.println("list <cf>; List all rows in the column family.");
state.out.println("list <cf>[<startKey>:];");
state.out.println(" List rows in the column family beginning with <startKey>.");
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Mar 22 19:36:18 2011
@@ -37,7 +37,6 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.marshal.AbstractCommutativeType;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.IColumnSerializer;
-import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.IIterableColumns;
import org.apache.cassandra.utils.FBUtilities;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Tue Mar 22 19:36:18 2011
@@ -132,7 +132,7 @@ public class ColumnFamilySerializer impl
ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
for (int i = 0; i < size; ++i)
{
- IColumn column = cf.getColumnSerializer().deserialize(dis, interner, fromRemote);
+ IColumn column = cf.getColumnSerializer().deserialize(dis, interner, fromRemote, (int) (System.currentTimeMillis() / 1000));
cf.addColumn(column);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Mar 22 19:36:18 2011
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.cache.AutoSavingKeyCache;
import org.apache.cassandra.cache.AutoSavingRowCache;
+import org.apache.cassandra.cache.JMXInstrumentedCache;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
@@ -1624,7 +1625,7 @@ public class ColumnFamilyStore implement
IColumn column = data.getColumn(expression.column_name);
if (column == null)
return false;
- int v = data.getComparator().compare(column.value(), expression.value);
+ int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
if (!satisfies(v, expression.op))
return false;
}
@@ -1769,6 +1770,16 @@ public class ColumnFamilyStore implement
return keyCache.getSize();
}
+ public JMXInstrumentedCache<DecoratedKey, ColumnFamily> getRowCache()
+ {
+ return ssTables.getRowCache();
+ }
+
+ public JMXInstrumentedCache<Pair<Descriptor, DecoratedKey>, Long> getKeyCache()
+ {
+ return ssTables.getKeyCache();
+ }
+
public static Iterable<ColumnFamilyStore> all()
{
Iterable<ColumnFamilyStore>[] stores = new Iterable[DatabaseDescriptor.getTables().size()];
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Tue Mar 22 19:36:18 2011
@@ -25,14 +25,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
public class ColumnSerializer implements IColumnSerializer
@@ -81,6 +79,11 @@ public class ColumnSerializer implements
*/
public Column deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote) throws IOException
{
+ return deserialize(dis, interner, fromRemote, (int) (System.currentTimeMillis() / 1000));
+ }
+
+ public Column deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException
+ {
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
if (name.remaining() <= 0)
throw new CorruptColumnException("invalid column name length " + name.remaining());
@@ -103,7 +106,7 @@ public class ColumnSerializer implements
int expiration = dis.readInt();
long ts = dis.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(dis);
- if ((int) (System.currentTimeMillis() / 1000 ) > expiration)
+ if (expiration < expireBefore)
{
// the column is now expired, we can safely return a simple
// tombstone
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Tue Mar 22 19:36:18 2011
@@ -375,8 +375,7 @@ public class CompactionManager implement
{
for (String ksname : DatabaseDescriptor.getNonSystemTables())
{
- Table ks = Table.open(ksname);
- for (ColumnFamilyStore cfs : ks.columnFamilyStores.values())
+ for (ColumnFamilyStore cfs : Table.open(ksname).getColumnFamilyStores())
cfs.disableAutoCompaction();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Tue Mar 22 19:36:18 2011
@@ -371,6 +371,11 @@ class SuperColumnSerializer implements I
public IColumn deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote) throws IOException
{
+ return deserialize(dis, interner, fromRemote, (int)(System.currentTimeMillis() / 1000));
+ }
+
+ public IColumn deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException
+ {
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
int localDeleteTime = dis.readInt();
if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
@@ -382,7 +387,7 @@ class SuperColumnSerializer implements I
/* read the number of columns */
int size = dis.readInt();
ColumnSerializer serializer = Column.serializer();
- ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, interner, size, fromRemote);
+ ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, interner, size, fromRemote, expireBefore);
SuperColumn superColumn = new SuperColumn(name, new ConcurrentSkipListMap<ByteBuffer,IColumn>(preSortedMap));
if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Mar 22 19:36:18 2011
@@ -23,10 +23,7 @@ import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -91,7 +88,7 @@ public class Table
/* Table name. */
public final String name;
/* ColumnFamilyStore per column family */
- public final Map<Integer, ColumnFamilyStore> columnFamilyStores = new HashMap<Integer, ColumnFamilyStore>(); // TODO make private again
+ private final Map<Integer, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<Integer, ColumnFamilyStore>();
private final Object[] indexLocks;
private ScheduledFuture<?> flushTask;
private volatile AbstractReplicationStrategy replicationStrategy;
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Tue Mar 22 19:36:18 2011
@@ -159,6 +159,11 @@ public class ConfigHelper
return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG));
}
+ public static String getRawInputSlicePredicate(Configuration conf)
+ {
+ return conf.get(INPUT_PREDICATE_CONFIG);
+ }
+
private static String predicateToString(SlicePredicate predicate)
{
assert predicate != null;
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java Tue Mar 22 19:36:18 2011
@@ -28,5 +28,5 @@ import org.apache.cassandra.db.IColumn;
public interface IColumnSerializer extends ICompactSerializer2<IColumn>
{
- public IColumn deserialize(DataInput in, ColumnFamilyStore interner, boolean fromRemote) throws IOException;
+ public IColumn deserialize(DataInput in, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Tue Mar 22 19:36:18 2011
@@ -52,6 +52,9 @@ public class SSTableIdentityIterator imp
public final int columnCount;
private final long columnPosition;
+ // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
+ private final int expireBefore;
+
/**
* Used to iterate through the columns of a row.
* @param sstable SSTable we are reading ffrom.
@@ -75,6 +78,7 @@ public class SSTableIdentityIterator imp
this.key = key;
this.dataStart = dataStart;
this.dataSize = dataSize;
+ this.expireBefore = (int)(System.currentTimeMillis() / 1000);
finishedAt = dataStart + dataSize;
try
@@ -137,7 +141,7 @@ public class SSTableIdentityIterator imp
{
try
{
- return sstable.getColumnSerializer().deserialize(file);
+ return sstable.getColumnSerializer().deserialize(file, null, false, expireBefore);
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Mar 22 19:36:18 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
@@ -575,7 +575,7 @@ public class SSTableReader extends SSTab
return ColumnFamily.create(metadata);
}
- public ICompactSerializer2<IColumn> getColumnSerializer()
+ public IColumnSerializer getColumnSerializer()
{
return metadata.cfType == ColumnFamilyType.Standard
? Column.serializer()
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Tue Mar 22 19:36:18 2011
@@ -45,8 +45,9 @@ public class ColumnSortedMap implements
private final int length;
private final ColumnFamilyStore interner;
private final boolean fromRemote;
+ private final int expireBefore;
- public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote)
+ public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
{
this.comparator = comparator;
this.serializer = serializer;
@@ -54,6 +55,7 @@ public class ColumnSortedMap implements
this.dis = dis;
this.length = length;
this.fromRemote = fromRemote;
+ this.expireBefore = expireBefore;
}
public int size()
@@ -143,7 +145,7 @@ public class ColumnSortedMap implements
public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
{
- return new ColumnSet(serializer, dis, interner, length, fromRemote);
+ return new ColumnSet(serializer, dis, interner, length, fromRemote, expireBefore);
}
}
@@ -154,14 +156,16 @@ class ColumnSet implements Set<Map.Entry
private final int length;
private final ColumnFamilyStore interner;
private boolean fromRemote;
+ private final int expireBefore;
- public ColumnSet(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote)
+ public ColumnSet(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
{
this.serializer = serializer;
this.dis = dis;
this.interner = interner;
this.length = length;
this.fromRemote = fromRemote;
+ this.expireBefore = expireBefore;
}
public int size()
@@ -181,7 +185,7 @@ class ColumnSet implements Set<Map.Entry
public Iterator<Entry<ByteBuffer, IColumn>> iterator()
{
- return new ColumnIterator(serializer, dis, interner, length, fromRemote);
+ return new ColumnIterator(serializer, dis, interner, length, fromRemote, expireBefore);
}
public Object[] toArray()
@@ -237,14 +241,16 @@ class ColumnIterator implements Iterator
private final boolean fromRemote;
private int count = 0;
private ColumnFamilyStore interner;
+ private final int expireBefore;
- public ColumnIterator(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote)
+ public ColumnIterator(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
{
this.dis = dis;
this.serializer = serializer;
this.interner = interner;
this.length = length;
this.fromRemote = fromRemote;
+ this.expireBefore = expireBefore;
}
private IColumn deserializeNext()
@@ -252,7 +258,7 @@ class ColumnIterator implements Iterator
try
{
count++;
- return serializer.deserialize(dis, interner, fromRemote);
+ return serializer.deserialize(dis, interner, fromRemote, expireBefore);
}
catch (IOException e)
{
Modified: cassandra/trunk/test/system/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/__init__.py?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/system/__init__.py (original)
+++ cassandra/trunk/test/system/__init__.py Tue Mar 22 19:36:18 2011
@@ -161,8 +161,9 @@ class ThriftTester(BaseTester):
Cassandra.CfDef('Keyspace1', 'Super4', column_type='Super', subcomparator_type='UTF8Type'),
Cassandra.CfDef('Keyspace1', 'Counter1', default_validation_class='CounterColumnType'),
Cassandra.CfDef('Keyspace1', 'SuperCounter1', column_type='Super', default_validation_class='CounterColumnType'),
- Cassandra.CfDef('Keyspace1', 'Indexed1', column_metadata=[Cassandra.ColumnDef('birthdate', 'LongType', Cassandra.IndexType.KEYS, 'birthdate')]),
- Cassandra.CfDef('Keyspace1', 'Indexed2', comparator_type='TimeUUIDType', column_metadata=[Cassandra.ColumnDef(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, 'LongType', Cassandra.IndexType.KEYS, 'birthdate')]),
+ Cassandra.CfDef('Keyspace1', 'Indexed1', column_metadata=[Cassandra.ColumnDef('birthdate', 'LongType', Cassandra.IndexType.KEYS, 'birthdate_index')]),
+ Cassandra.CfDef('Keyspace1', 'Indexed2', comparator_type='TimeUUIDType', column_metadata=[Cassandra.ColumnDef(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, 'LongType', Cassandra.IndexType.KEYS)]),
+ Cassandra.CfDef('Keyspace1', 'Indexed3', comparator_type='TimeUUIDType', column_metadata=[Cassandra.ColumnDef(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, 'UTF8Type', Cassandra.IndexType.KEYS)]),
])
Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Tue Mar 22 19:36:18 2011
@@ -1767,7 +1767,22 @@ class TestMutations(ThriftTester):
assert result[0].key == 'key3'
assert len(result[0].columns) == 2, result[0].columns
- cp = ColumnParent('Indexed2')
+ def test_index_scan_uuid_names(self):
+ _set_keyspace('Keyspace1')
+ sp = SlicePredicate(slice_range=SliceRange('', ''))
+ cp = ColumnParent('Indexed3') # timeuuid name, utf8 values
+ u = uuid.UUID('00000000-0000-1000-0000-000000000000').bytes
+ u2 = uuid.UUID('00000000-0000-1000-0000-000000000001').bytes
+ client.insert('key1', ColumnParent('Indexed3'), Column(u, 'a', 0), ConsistencyLevel.ONE)
+ client.insert('key1', ColumnParent('Indexed3'), Column(u2, 'b', 0), ConsistencyLevel.ONE)
+ # name comparator + data validator of incompatible types -- see CASSANDRA-2347
+ clause = IndexClause([IndexExpression(u, IndexOperator.EQ, 'a'),
+ IndexExpression(u2, IndexOperator.EQ, 'b')], '')
+ result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+ assert len(result) == 1, result
+
+ cp = ColumnParent('Indexed2') # timeuuid name, long values
+
# name must be valid (TimeUUID)
clause = IndexClause([IndexExpression('foo', IndexOperator.EQ, uuid.UUID('00000000-0000-1000-0000-000000000000').bytes)], '')
_expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)
Modified: cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java Tue Mar 22 19:36:18 2011
@@ -20,15 +20,21 @@ package org.apache.cassandra;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.ByteBufferUtil;
public class CleanupHelper extends SchemaLoader
{
@@ -76,4 +82,30 @@ public class CleanupHelper extends Schem
throw new RuntimeException(e);
}
}
+
+ protected void insertData(String keyspace, String columnFamily, int offset, int numberOfRows) throws IOException
+ {
+ for (int i = offset; i < offset + numberOfRows; i++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes("key" + i);
+ RowMutation rowMutation = new RowMutation(keyspace, key);
+ QueryPath path = new QueryPath(columnFamily, null, ByteBufferUtil.bytes("col" + i));
+
+ rowMutation.add(path, ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
+ rowMutation.applyUnsafe();
+ }
+ }
+
+ /* usually used to populate the cache */
+ protected void readData(String keyspace, String columnFamily, int offset, int numberOfRows) throws IOException
+ {
+ ColumnFamilyStore store = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+ for (int i = offset; i < offset + numberOfRows; i++)
+ {
+ DecoratedKey key = Util.dk("key" + i);
+ QueryPath path = new QueryPath(columnFamily, null, ByteBufferUtil.bytes("col" + i));
+
+ store.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ }
+ }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Tue Mar 22 19:36:18 2011
@@ -22,6 +22,8 @@ package org.apache.cassandra.db;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
@@ -29,22 +31,71 @@ import org.junit.Test;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
public class KeyCacheTest extends CleanupHelper
{
private static final String TABLE1 = "KeyCacheSpace";
+ private static final String COLUMN_FAMILY1 = "Standard1";
+ private static final String COLUMN_FAMILY2 = "Standard2";
@Test
public void testKeyCache50() throws IOException, ExecutionException, InterruptedException
{
- testKeyCache("Standard1", 64);
+ testKeyCache(COLUMN_FAMILY1, 64);
}
@Test
public void testKeyCache100() throws IOException, ExecutionException, InterruptedException
{
- testKeyCache("Standard2", 128);
+ testKeyCache(COLUMN_FAMILY2, 128);
+ }
+
+ @Test
+ public void testKeyCacheLoad() throws Exception
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore store = Table.open(TABLE1).getColumnFamilyStore(COLUMN_FAMILY2);
+
+ // empty the cache
+ store.invalidateKeyCache();
+ assert store.getKeyCacheSize() == 0;
+
+ // insert data and force to disk
+ insertData(TABLE1, COLUMN_FAMILY2, 0, 100);
+ store.forceBlockingFlush();
+
+ // populate the cache
+ readData(TABLE1, COLUMN_FAMILY2, 0, 100);
+ assert store.getKeyCacheSize() == 100;
+
+ // really? our caches don't implement the map interface? (hence no .addAll)
+ Map<Pair<Descriptor, DecoratedKey>, Long> savedMap = new HashMap<Pair<Descriptor, DecoratedKey>, Long>();
+ for (Map.Entry<Pair<Descriptor, DecoratedKey>, Long> entry : store.getKeyCache().getEntrySet())
+ {
+ savedMap.put(entry.getKey(), entry.getValue());
+ }
+
+ // force the cache to disk
+ store.keyCache.submitWrite().get();
+
+ // empty the cache again to make sure values came from disk
+ store.invalidateKeyCache();
+ assert store.getKeyCacheSize() == 0;
+
+ // load the cache from disk
+ store.unregisterMBean(); // unregistering old MBean to test how key cache will be loaded
+ ColumnFamilyStore newStore = ColumnFamilyStore.createColumnFamilyStore(Table.open(TABLE1), COLUMN_FAMILY2);
+ assert newStore.getKeyCacheSize() == 100;
+
+ assert savedMap.size() == 100;
+ for (Map.Entry<Pair<Descriptor, DecoratedKey>, Long> entry : savedMap.entrySet())
+ {
+ assert newStore.getKeyCache().get(entry.getKey()).equals(entry.getValue());
+ }
}
public void testKeyCache(String cfName, int expectedCacheSize) throws IOException, ExecutionException, InterruptedException
@@ -87,4 +138,5 @@ public class KeyCacheTest extends Cleanu
CompactionManager.instance.submitMajor(store, 0, Integer.MAX_VALUE).get();
keyCacheSize = store.getKeyCacheCapacity();
assert keyCacheSize == 1 : keyCacheSize;
- }}
+ }
+}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java Tue Mar 22 19:36:18 2011
@@ -18,33 +18,33 @@
package org.apache.cassandra.db;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Collection;
-import org.apache.cassandra.CleanupHelper;
+import org.junit.Test;
+import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.junit.Test;
-
public class RowCacheTest extends CleanupHelper
{
+ private String KEYSPACE = "RowCacheSpace";
+ private String COLUMN_FAMILY_WITH_CACHE = "CachedCF";
+ private String COLUMN_FAMILY_WITHOUT_CACHE = "CFWithoutCache";
+
@Test
public void testRowCache() throws Exception
{
- String KEYSPACE = "RowCacheSpace";
- String COLUMN_FAMILY_WITH_CACHE = "CachedCF";
- String COLUMN_FAMILY_WITHOUT_CACHE = "CFWithoutCache";
-
CompactionManager.instance.disableAutoCompaction();
Table table = Table.open(KEYSPACE);
ColumnFamilyStore cachedStore = table.getColumnFamilyStore(COLUMN_FAMILY_WITH_CACHE);
ColumnFamilyStore noCacheStore = table.getColumnFamilyStore(COLUMN_FAMILY_WITHOUT_CACHE);
+ // empty the row cache
+ cachedStore.invalidateRowCache();
+
// inserting 100 rows into both column families
insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
insertData(KEYSPACE, COLUMN_FAMILY_WITHOUT_CACHE, 0, 100);
@@ -109,17 +109,37 @@ public class RowCacheTest extends Cleanu
}
}
- private void insertData(String keyspace, String columnFamily, int offset, int numberOfRows) throws IOException
+ @Test
+ public void testRowCacheLoad() throws Exception
{
- for (int i = offset; i < offset + numberOfRows; i++)
- {
- ByteBuffer key = ByteBufferUtil.bytes("key" + i);
- RowMutation rowMutation = new RowMutation(keyspace, key);
- QueryPath path = new QueryPath(columnFamily, null, ByteBufferUtil.bytes("col" + i));
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore store = Table.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY_WITH_CACHE);
+
+ // empty the cache
+ store.invalidateRowCache();
+ assert store.getRowCacheSize() == 0;
+
+ // insert data and fill the cache
+ insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
+ readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
+ assert store.getRowCacheSize() == 100;
- rowMutation.add(path, ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
- rowMutation.applyUnsafe();
+ // force the cache to disk
+ store.rowCache.submitWrite().get();
+
+ // empty the cache again to make sure values came from disk
+ store.invalidateRowCache();
+ assert store.getRowCacheSize() == 0;
+
+ // load the cache from disk
+ store.rowCache.readSaved();
+ assert store.getRowCacheSize() == 100;
+
+ for (int i = 0; i < 100; i++)
+ {
+ // verify the correct data was found
+ assert store.getRawCachedRow(Util.dk("key" + i)).getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i));
}
}
-
}