You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/03/04 18:46:20 UTC
svn commit: r1078069 - in /cassandra/trunk: ./ conf/ contrib/ contrib/pig/
contrib/pig/bin/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/config/ src/java/org/...
Author: brandonwilliams
Date: Fri Mar 4 17:46:20 2011
New Revision: 1078069
URL: http://svn.apache.org/viewvc?rev=1078069&view=rev
Log:
Merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/pig/README.txt
cassandra/trunk/contrib/pig/bin/pig_cassandra
cassandra/trunk/contrib/pig/build.xml
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7:1026516-1076866
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7:1026516-1078063
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Mar 4 17:46:20 2011
@@ -15,6 +15,9 @@
* add nodetool join command (CASSANDRA-2160)
* fix secondary indexes on pre-existing or streamed data (CASSANDRA-2244)
* initialize endpoing in gossiper earlier (CASSANDRA-2228)
+ * add ability to write to Cassandra from Pig (CASSANDRA-1828)
+ * add rpc_[min|max]_threads (CASSANDRA-2176)
+
0.7.3
* Keep endpoint state until aVeryLongTime (CASSANDRA-2115)
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Mar 4 17:46:20 2011
@@ -190,6 +190,20 @@ rpc_port: 9160
# enable or disable keepalive on rpc connections
rpc_keepalive: true
+# Cassandra uses thread-per-client for client RPC. This can
+# be expensive in memory used for thread stack for a large
+# enough number of clients. (Hence, connection pooling is
+# very, very strongly recommended.)
+#
+# Uncomment rpc_min|max|thread to set request pool size.
+# You would primarily set max as a safeguard against misbehaved
+# clients; if you do hit the max, Cassandra will block until
+# one disconnects before accepting more. The defaults are
+# min of 16 and max unlimited.
+#
+# rpc_min_threads: 16
+# rpc_max_threads: 2048
+
# uncomment to set socket buffer sizes on rpc connections
# rpc_send_buff_size_in_bytes:
# rpc_recv_buff_size_in_bytes:
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 4 17:46:20 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1076866
+/cassandra/branches/cassandra-0.7/contrib:1026516-1078063
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/pig/README.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/README.txt?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/README.txt (original)
+++ cassandra/trunk/contrib/pig/README.txt Fri Mar 4 17:46:20 2011
@@ -1,4 +1,5 @@
-A Pig LoadFunc that reads all columns from a given ColumnFamily.
+A Pig storage class that reads all columns from a given ColumnFamily, or writes
+properly formatted results into a ColumnFamily.
Setup:
@@ -7,10 +8,15 @@ configuration and set the PIG_HOME and J
variables to the location of a Pig >= 0.7.0 install and your Java
install.
+NOTE: if you intend to _output_ to Cassandra, until there is a Pig release that
+uses jackson > 1.0.1 (see https://issues.apache.org/jira/browse/PIG-1863) you
+will need to build Pig yourself with jackson 1.4. To do this, edit Pig's
+ivy/libraries.properties, and run ant.
+
If you would like to run using the Hadoop backend, you should
also set PIG_CONF_DIR to the location of your Hadoop config.
-FInally, set the following as environment variables (uppercase,
+Finally, set the following as environment variables (uppercase,
underscored), or as Hadoop configuration variables (lowercase, dotted):
* PIG_RPC_PORT or cassandra.thrift.port : the port thrift is listening on
* PIG_INITIAL_ADDRESS or cassandra.thrift.address : initial address to connect to
@@ -40,3 +46,11 @@ grunt> namecounts = FOREACH namegroups G
grunt> orderednames = ORDER namecounts BY $0;
grunt> topnames = LIMIT orderednames 50;
grunt> dump topnames;
+
+Outputting to Cassandra requires the same format from input, so the simplest example is:
+
+grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage();
+grunt> STORE rows into 'cassandra://Keyspace1/Standard2' USING CassandraStorage();
+
+Which will copy the ColumnFamily. Note that the destination ColumnFamily must
+already exist for this to work.
Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/bin/pig_cassandra (original)
+++ cassandra/trunk/contrib/pig/bin/pig_cassandra Fri Mar 4 17:46:20 2011
@@ -38,7 +38,7 @@ if [ "x$PIG_HOME" = "x" ]; then
fi
# pig jar.
-PIG_JAR=$PIG_HOME/pig*core.jar
+PIG_JAR=$PIG_HOME/pig*.jar
if [ ! -e $PIG_JAR ]; then
echo "Unable to locate Pig jar" >&2
exit 1
Modified: cassandra/trunk/contrib/pig/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/build.xml (original)
+++ cassandra/trunk/contrib/pig/build.xml Fri Mar 4 17:46:20 2011
@@ -21,7 +21,7 @@
<!-- stores the environment for locating PIG_HOME -->
<property environment="env" />
<property name="cassandra.dir" value="../.." />
- <property name="cassandra.lib" value="" />
+ <property name="cassandra.lib" value="${cassandra.dir}/lib" />
<property name="cassandra.classes"
value="${cassandra.dir}/build/classes" />
<property name="cassandra.classes.main"
@@ -36,8 +36,9 @@
<path id="pig.classpath">
<fileset file="${env.PIG_HOME}/pig*.jar" />
- <fileset dir="${cassandra.dir}/lib">
+ <fileset dir="${cassandra.lib}">
<include name="libthrift*.jar" />
+ <include name="avro*.jar" />
</fileset>
<fileset dir="${cassandra.dir}/build/lib/jars">
<include name="google-collections*.jar" />
Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Mar 4 17:46:20 2011
@@ -20,33 +20,36 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.avro.Deletion;
+import org.apache.cassandra.avro.ColumnOrSuperColumn;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.LoadFunc;
+import org.apache.pig.*;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DefaultDataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
/**
* A LoadFunc wrapping ColumnFamilyInputFormat.
*
* A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
*/
-public class CassandraStorage extends LoadFunc
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown
{
// system environment variables that can be set to configure connection info:
// alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
@@ -56,9 +59,11 @@ public class CassandraStorage extends Lo
private final static ByteBuffer BOUND = FBUtilities.EMPTY_BYTE_BUFFER;
private final static int LIMIT = 1024;
+ private static final Log logger = LogFactory.getLog(CassandraStorage.class);
private Configuration conf;
private RecordReader reader;
+ private RecordWriter writer;
@Override
public Tuple getNext() throws IOException
@@ -116,8 +121,7 @@ public class CassandraStorage extends Lo
@Override
public InputFormat getInputFormat()
{
- ColumnFamilyInputFormat inputFormat = new ColumnFamilyInputFormat();
- return inputFormat;
+ return new ColumnFamilyInputFormat();
}
@Override
@@ -126,38 +130,50 @@ public class CassandraStorage extends Lo
this.reader = reader;
}
- @Override
- public void setLocation(String location, Job job) throws IOException
+ private String[] parseLocation(String location) throws IOException
{
// parse uri into keyspace and columnfamily
- String ksname, cfname;
+ String names[];
try
{
if (!location.startsWith("cassandra://"))
throw new Exception("Bad scheme.");
String[] parts = location.split("/+");
- ksname = parts[1];
- cfname = parts[2];
+ names = new String[]{ parts[1], parts[2] };
}
catch (Exception e)
{
throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage());
}
+ return names;
+ }
- // and configure
- SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT);
- SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
- conf = job.getConfiguration();
- ConfigHelper.setInputSlicePredicate(conf, predicate);
- ConfigHelper.setInputColumnFamily(conf, ksname, cfname);
-
- // check the environment for connection information
+ private void setConnectionInformation() throws IOException
+ {
if (System.getenv(PIG_RPC_PORT) != null)
ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ else
+ throw new IOException("PIG_RPC_PORT environment variable not set");
if (System.getenv(PIG_INITIAL_ADDRESS) != null)
ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+ else
+ throw new IOException("PIG_INITIAL_ADDRESS environment variable not set");
if (System.getenv(PIG_PARTITIONER) != null)
ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
+ else
+ throw new IOException("PIG_PARTITIONER environment variable not set");
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException
+ {
+ SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT);
+ SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+ conf = job.getConfiguration();
+ ConfigHelper.setInputSlicePredicate(conf, predicate);
+ String[] names = parseLocation(location);
+ ConfigHelper.setInputColumnFamily(conf, names[0], names[1]);
+ setConnectionInformation();
}
@Override
@@ -165,4 +181,136 @@ public class CassandraStorage extends Lo
{
return location;
}
+
+ /* StoreFunc methods */
+ public void setStoreFuncUDFContextSignature(String signature)
+ {
+ }
+
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+ {
+ return relativeToAbsolutePath(location, curDir);
+ }
+
+ public void setStoreLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+ String[] names = parseLocation(location);
+ ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]);
+ setConnectionInformation();
+ }
+
+ public OutputFormat getOutputFormat()
+ {
+ return new ColumnFamilyOutputFormat();
+ }
+
+ public void checkSchema(ResourceSchema schema) throws IOException
+ {
+ // we don't care about types, they all get casted to ByteBuffers
+ }
+
+ public void prepareToWrite(RecordWriter writer)
+ {
+ this.writer = writer;
+ }
+
+ private ByteBuffer objToBB(Object o)
+ {
+ if (o == null)
+ return (ByteBuffer)o;
+ if (o instanceof java.lang.String)
+ o = new DataByteArray((String)o);
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ }
+
+ public void putNext(Tuple t) throws ExecException, IOException
+ {
+ ByteBuffer key = objToBB(t.get(0));
+ DefaultDataBag pairs = (DefaultDataBag) t.get(1);
+ ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
+
+ try
+ {
+ for (Tuple pair : pairs)
+ {
+ Mutation mutation = new Mutation();
+ if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
+ {
+ org.apache.cassandra.avro.SuperColumn sc = new org.apache.cassandra.avro.SuperColumn();
+ sc.name = objToBB(pair.get(0));
+ ArrayList<org.apache.cassandra.avro.Column> columns = new ArrayList<org.apache.cassandra.avro.Column>();
+ for (Tuple subcol : (DefaultDataBag) pair.get(1))
+ {
+ org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column();
+ column.name = objToBB(subcol.get(0));
+ column.value = objToBB(subcol.get(1));
+ column.timestamp = System.currentTimeMillis() * 1000;
+ columns.add(column);
+ }
+ if (columns.isEmpty()) // a deletion
+ {
+ mutation.deletion = new Deletion();
+ mutation.deletion.super_column = objToBB(pair.get(0));
+ mutation.deletion.timestamp = System.currentTimeMillis() * 1000;
+ }
+ else
+ {
+ sc.columns = columns;
+ mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+ mutation.column_or_supercolumn.super_column = sc;
+ }
+ }
+ else // assume column since it could be anything else
+ {
+ if (pair.get(1) == null)
+ {
+ mutation.deletion = new Deletion();
+ mutation.deletion.predicate = new org.apache.cassandra.avro.SlicePredicate();
+ mutation.deletion.predicate.column_names = Arrays.asList(objToBB(pair.get(0)));
+ mutation.deletion.timestamp = System.currentTimeMillis() * 1000;
+ }
+ else
+ {
+ org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column();
+ column.name = objToBB(pair.get(0));
+ column.value = objToBB(pair.get(1));
+ column.timestamp = System.currentTimeMillis() * 1000;
+ mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+ mutation.column_or_supercolumn.column = column;
+ mutationList.add(mutation);
+ }
+ }
+ mutationList.add(mutation);
+ }
+ }
+ catch (ClassCastException e)
+ {
+ throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily");
+ }
+ try
+ {
+ writer.write(key, mutationList);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ public void cleanupOnFailure(String failure, Job job)
+ {
+ }
+
+ /* LoadPushDown methods */
+
+ public List<OperatorSet> getFeatures() {
+ return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+ }
+
+ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException
+ {
+ return new RequiredFieldResponse(true);
+ }
+
}
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1078063
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1078063
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1078063
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1078063
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1078063
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Mar 4 17:46:20 2011
@@ -66,6 +66,8 @@ public class Config
public String rpc_address;
public Integer rpc_port = 9160;
public Boolean rpc_keepalive = true;
+ public Integer rpc_min_threads = 16;
+ public Integer rpc_max_threads = Integer.MAX_VALUE;
public Integer rpc_send_buff_size_in_bytes;
public Integer rpc_recv_buff_size_in_bytes;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar 4 17:46:20 2011
@@ -1049,6 +1049,16 @@ public class DatabaseDescriptor
return conf.rpc_keepalive;
}
+ public static Integer getRpcMinThreads()
+ {
+ return conf.rpc_min_threads;
+ }
+
+ public static Integer getRpcMaxThreads()
+ {
+ return conf.rpc_max_threads;
+ }
+
public static Integer getRpcSendBufferSize()
{
return conf.rpc_send_buff_size_in_bytes;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Fri Mar 4 17:46:20 2011
@@ -221,25 +221,23 @@ public class ColumnFamily implements ICo
public void addColumn(IColumn column)
{
ByteBuffer name = column.name();
- IColumn oldColumn = columns.putIfAbsent(name, column);
- if (oldColumn != null)
+ IColumn oldColumn;
+ while ((oldColumn = columns.putIfAbsent(name, column)) != null)
{
if (oldColumn instanceof SuperColumn)
{
((SuperColumn) oldColumn).putColumn(column);
+ break; // Delegated to SuperColumn
}
else
{
// calculate reconciled col from old (existing) col and new col
IColumn reconciledColumn = column.reconcile(oldColumn);
- while (!columns.replace(name, oldColumn, reconciledColumn))
- {
- // if unable to replace, then get updated old (existing) col
- oldColumn = columns.get(name);
- // re-calculate reconciled col from updated old col and original new col
- reconciledColumn = column.reconcile(oldColumn);
- // try to re-update value, again
- }
+ if (columns.replace(name, oldColumn, reconciledColumn))
+ break;
+
+ // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+ // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Mar 4 17:46:20 2011
@@ -167,19 +167,16 @@ public class SuperColumn implements ICol
assert column instanceof Column : "A super column can only contain simple columns";
ByteBuffer name = column.name();
- IColumn oldColumn = columns_.putIfAbsent(name, column);
- if (oldColumn != null)
+ IColumn oldColumn;
+ while ((oldColumn = columns_.putIfAbsent(name, column)) != null)
{
IColumn reconciledColumn = column.reconcile(oldColumn);
- while (!columns_.replace(name, oldColumn, reconciledColumn))
- {
- // if unable to replace, then get updated old (existing) col
- oldColumn = columns_.get(name);
- // re-calculate reconciled col from updated old col and original new col
- reconciledColumn = column.reconcile(oldColumn);
- // try to re-update value, again
- }
- }
+ if (columns_.replace(name, oldColumn, reconciledColumn))
+ break;
+
+ // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+ // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
+ }
}
/*
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Mar 4 17:46:20 2011
@@ -301,7 +301,7 @@ public class SSTableReader extends SSTab
long dataPosition = input.readLong();
if (key != null)
{
- DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, ByteBufferUtil.clone(key));
+ DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, key);
if (recreatebloom)
bf.add(decoratedKey.key);
if (shouldAddEntry)
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Mar 4 17:46:20 2011
@@ -82,8 +82,6 @@ public abstract class AbstractCassandraD
protected int listenPort;
protected volatile boolean isRunning = false;
- public static final int MIN_WORKER_THREADS = 64;
-
/**
* This is a hook for concrete daemons to initialize themselves suitably.
*
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Fri Mar 4 17:46:20 2011
@@ -133,7 +133,8 @@ public class CassandraDaemon extends org
// ThreadPool Server
CustomTThreadPoolServer.Options options = new CustomTThreadPoolServer.Options();
- options.minWorkerThreads = MIN_WORKER_THREADS;
+ options.minWorkerThreads = DatabaseDescriptor.getRpcMinThreads();
+ options.maxWorkerThreads = DatabaseDescriptor.getRpcMaxThreads();
ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
options.minWorkerThreads,
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Fri Mar 4 17:46:20 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +32,7 @@ import org.apache.thrift.TProcessorFacto
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.*;
/**
@@ -59,6 +57,9 @@ public class CustomTThreadPoolServer ext
// Server options
private Options options_;
+ //Track and Limit the number of connected clients
+ private final AtomicInteger activeClients = new AtomicInteger(0);
+
// Customizable server options
public static class Options
{
@@ -101,10 +102,24 @@ public class CustomTThreadPoolServer ext
stopped_ = false;
while (!stopped_)
{
+ // block until we are under max clients
+ while (activeClients.get() >= options_.maxWorkerThreads)
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
int failureCount = 0;
try
{
TTransport client = serverTransport_.accept();
+ activeClients.incrementAndGet();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
}
@@ -116,6 +131,9 @@ public class CustomTThreadPoolServer ext
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
}
}
+
+ if (activeClients.get() >= options_.maxWorkerThreads)
+ LOGGER.warn("Maximum number of clients " + options_.maxWorkerThreads + " reached");
}
executorService_.shutdown();
@@ -203,6 +221,10 @@ public class CustomTThreadPoolServer ext
{
LOGGER.error("Error occurred during processing of message.", x);
}
+ finally
+ {
+ activeClients.decrementAndGet();
+ }
if (inputTransport != null)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Fri Mar 4 17:46:20 2011
@@ -188,23 +188,28 @@ public class ByteBufferUtil
throw new RuntimeException(e);
}
}
-
- public static ByteBuffer clone(ByteBuffer o)
+
+ /**
+ * @return a new copy of the data in @param buffer
+ * USUALLY YOU SHOULD USE ByteBuffer.duplicate() INSTEAD, which creates a new Buffer
+ * (so you can mutate its position without affecting the original) without copying the underlying array.
+ */
+ public static ByteBuffer clone(ByteBuffer buffer)
{
- assert o != null;
+ assert buffer != null;
- if (o.remaining() == 0)
+ if (buffer.remaining() == 0)
return EMPTY_BYTE_BUFFER;
- ByteBuffer clone = ByteBuffer.allocate(o.remaining());
+ ByteBuffer clone = ByteBuffer.allocate(buffer.remaining());
- if (o.hasArray())
+ if (buffer.hasArray())
{
- System.arraycopy(o.array(), o.arrayOffset() + o.position(), clone.array(), 0, o.remaining());
+ System.arraycopy(buffer.array(), buffer.arrayOffset() + buffer.position(), clone.array(), 0, buffer.remaining());
}
else
{
- clone.put(o.duplicate());
+ clone.put(buffer.duplicate());
clone.flip();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Fri Mar 4 17:46:20 2011
@@ -289,7 +289,7 @@ public class FBUtilities
MessageDigest messageDigest = localMD5Digest.get();
for(ByteBuffer block : data)
{
- messageDigest.update(ByteBufferUtil.clone(block));
+ messageDigest.update(block.duplicate());
}
return messageDigest.digest();