You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/03/04 18:46:20 UTC

svn commit: r1078069 - in /cassandra/trunk: ./ conf/ contrib/ contrib/pig/ contrib/pig/bin/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/config/ src/java/org/...

Author: brandonwilliams
Date: Fri Mar  4 17:46:20 2011
New Revision: 1078069

URL: http://svn.apache.org/viewvc?rev=1078069&view=rev
Log:
Merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/pig/README.txt
    cassandra/trunk/contrib/pig/bin/pig_cassandra
    cassandra/trunk/contrib/pig/build.xml
    cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7:1026516-1076866
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7:1026516-1078063
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Mar  4 17:46:20 2011
@@ -15,6 +15,9 @@
  * add nodetool join command (CASSANDRA-2160)
  * fix secondary indexes on pre-existing or streamed data (CASSANDRA-2244)
  * initialize endpoing in gossiper earlier (CASSANDRA-2228)
+ * add ability to write to Cassandra from Pig (CASSANDRA-1828)
+ * add rpc_[min|max]_threads (CASSANDRA-2176)
+
 
 0.7.3
  * Keep endpoint state until aVeryLongTime (CASSANDRA-2115)

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Mar  4 17:46:20 2011
@@ -190,6 +190,20 @@ rpc_port: 9160
 # enable or disable keepalive on rpc connections
 rpc_keepalive: true
 
+# Cassandra uses thread-per-client for client RPC.  This can
+# be expensive in memory used for thread stack for a large
+# enough number of clients.  (Hence, connection pooling is
+# very, very strongly recommended.)
+# 
+# Uncomment rpc_min|max|thread to set request pool size.
+# You would primarily set max as a safeguard against misbehaved
+# clients; if you do hit the max, Cassandra will block until
+# one disconnects before accepting more.  The defaults are
+# min of 16 and max unlimited.
+#
+# rpc_min_threads: 16
+# rpc_max_threads: 2048
+
 # uncomment to set socket buffer sizes on rpc connections
 # rpc_send_buff_size_in_bytes:
 # rpc_recv_buff_size_in_bytes:

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 17:46:20 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1076866
+/cassandra/branches/cassandra-0.7/contrib:1026516-1078063
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/pig/README.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/README.txt?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/README.txt (original)
+++ cassandra/trunk/contrib/pig/README.txt Fri Mar  4 17:46:20 2011
@@ -1,4 +1,5 @@
-A Pig LoadFunc that reads all columns from a given ColumnFamily.
+A Pig storage class that reads all columns from a given ColumnFamily, or writes
+properly formatted results into a ColumnFamily.
 
 Setup:
 
@@ -7,10 +8,15 @@ configuration and set the PIG_HOME and J
 variables to the location of a Pig >= 0.7.0 install and your Java
 install. 
 
+NOTE: if you intend to _output_ to Cassandra, until there is a Pig release that
+uses jackson > 1.0.1 (see https://issues.apache.org/jira/browse/PIG-1863) you
+will need to build Pig yourself with jackson 1.4.  To do this, edit Pig's
+ivy/libraries.properties, and run ant.
+
 If you would like to run using the Hadoop backend, you should
 also set PIG_CONF_DIR to the location of your Hadoop config.
 
-FInally, set the following as environment variables (uppercase,
+Finally, set the following as environment variables (uppercase,
 underscored), or as Hadoop configuration variables (lowercase, dotted):
 * PIG_RPC_PORT or cassandra.thrift.port : the port thrift is listening on 
 * PIG_INITIAL_ADDRESS or cassandra.thrift.address : initial address to connect to
@@ -40,3 +46,11 @@ grunt> namecounts = FOREACH namegroups G
 grunt> orderednames = ORDER namecounts BY $0;
 grunt> topnames = LIMIT orderednames 50;
 grunt> dump topnames;
+
+Outputting to Cassandra requires the same format from input, so the simplest example is:
+
+grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage();
+grunt> STORE rows into 'cassandra://Keyspace1/Standard2' USING CassandraStorage();
+
+Which will copy the ColumnFamily.  Note that the destination ColumnFamily must
+already exist for this to work.

Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/bin/pig_cassandra (original)
+++ cassandra/trunk/contrib/pig/bin/pig_cassandra Fri Mar  4 17:46:20 2011
@@ -38,7 +38,7 @@ if [ "x$PIG_HOME" = "x" ]; then
 fi
 
 # pig jar.
-PIG_JAR=$PIG_HOME/pig*core.jar
+PIG_JAR=$PIG_HOME/pig*.jar
 if [ ! -e $PIG_JAR ]; then
     echo "Unable to locate Pig jar" >&2
     exit 1

Modified: cassandra/trunk/contrib/pig/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/build.xml (original)
+++ cassandra/trunk/contrib/pig/build.xml Fri Mar  4 17:46:20 2011
@@ -21,7 +21,7 @@
     <!-- stores the environment for locating PIG_HOME -->
     <property environment="env" />
     <property name="cassandra.dir" value="../.." />
-    <property name="cassandra.lib" value="" />
+    <property name="cassandra.lib" value="${cassandra.dir}/lib" />
     <property name="cassandra.classes"
               value="${cassandra.dir}/build/classes" />
     <property name="cassandra.classes.main"
@@ -36,8 +36,9 @@
 
     <path id="pig.classpath">
         <fileset file="${env.PIG_HOME}/pig*.jar" />
-        <fileset dir="${cassandra.dir}/lib">
+        <fileset dir="${cassandra.lib}">
             <include name="libthrift*.jar" />
+            <include name="avro*.jar" />
         </fileset>
         <fileset dir="${cassandra.dir}/build/lib/jars">
             <include name="google-collections*.jar" />

Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Mar  4 17:46:20 2011
@@ -20,33 +20,36 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.avro.Deletion;
+import org.apache.cassandra.avro.ColumnOrSuperColumn;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.*;
 
-import org.apache.pig.LoadFunc;
+import org.apache.pig.*;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DefaultDataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 
 /**
  * A LoadFunc wrapping ColumnFamilyInputFormat.
  *
  * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
  */
-public class CassandraStorage extends LoadFunc
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown
 {
     // system environment variables that can be set to configure connection info:
     // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
@@ -56,9 +59,11 @@ public class CassandraStorage extends Lo
 
     private final static ByteBuffer BOUND = FBUtilities.EMPTY_BYTE_BUFFER;
     private final static int LIMIT = 1024;
+    private static final Log logger = LogFactory.getLog(CassandraStorage.class);
 
     private Configuration conf;
     private RecordReader reader;
+    private RecordWriter writer;
 
     @Override
     public Tuple getNext() throws IOException
@@ -116,8 +121,7 @@ public class CassandraStorage extends Lo
     @Override
     public InputFormat getInputFormat()
     {
-        ColumnFamilyInputFormat inputFormat = new ColumnFamilyInputFormat();
-        return inputFormat;
+        return new ColumnFamilyInputFormat();
     }
 
     @Override
@@ -126,38 +130,50 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
-    @Override
-    public void setLocation(String location, Job job) throws IOException
+    private String[] parseLocation(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
-        String ksname, cfname;
+        String names[];
         try
         {
             if (!location.startsWith("cassandra://"))
                 throw new Exception("Bad scheme.");
             String[] parts = location.split("/+");
-            ksname = parts[1];
-            cfname = parts[2];
+            names = new String[]{ parts[1], parts[2] };
         }
         catch (Exception e)
         {
             throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage());
         }
+       return names;
+    }
 
-        // and configure
-        SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT);
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
-        conf = job.getConfiguration();
-        ConfigHelper.setInputSlicePredicate(conf, predicate);
-        ConfigHelper.setInputColumnFamily(conf, ksname, cfname);
-
-        // check the environment for connection information
+    private void setConnectionInformation() throws IOException
+    {
         if (System.getenv(PIG_RPC_PORT) != null)
             ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
+        else
+            throw new IOException("PIG_RPC_PORT environment variable not set");
         if (System.getenv(PIG_INITIAL_ADDRESS) != null)
             ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+        else
+            throw new IOException("PIG_INITIAL_ADDRESS environment variable not set");
         if (System.getenv(PIG_PARTITIONER) != null)
             ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
+        else
+            throw new IOException("PIG_PARTITIONER environment variable not set");
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException
+    {
+        SliceRange range = new SliceRange(BOUND, BOUND, false, LIMIT);
+        SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+        conf = job.getConfiguration();
+        ConfigHelper.setInputSlicePredicate(conf, predicate);
+        String[] names = parseLocation(location);
+        ConfigHelper.setInputColumnFamily(conf, names[0], names[1]);
+        setConnectionInformation();
     }
 
     @Override
@@ -165,4 +181,136 @@ public class CassandraStorage extends Lo
     {
         return location;
     }
+
+    /* StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+
+    public void setStoreLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        String[] names = parseLocation(location);
+        ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]);
+        setConnectionInformation();
+    }
+
+    public OutputFormat getOutputFormat()
+    {
+        return new ColumnFamilyOutputFormat();
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
+    public void prepareToWrite(RecordWriter writer)
+    {
+        this.writer = writer;
+    }
+
+    private ByteBuffer objToBB(Object o)
+    {
+        if (o == null)
+            return (ByteBuffer)o;
+        if (o instanceof java.lang.String)
+            o = new DataByteArray((String)o);
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+    }
+
+    public void putNext(Tuple t) throws ExecException, IOException
+    {
+        ByteBuffer key = objToBB(t.get(0));
+        DefaultDataBag pairs = (DefaultDataBag) t.get(1);
+        ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
+
+        try
+        {
+            for (Tuple pair : pairs)
+            {
+               Mutation mutation = new Mutation();
+               if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
+               {
+                   org.apache.cassandra.avro.SuperColumn sc = new org.apache.cassandra.avro.SuperColumn();
+                   sc.name = objToBB(pair.get(0));
+                   ArrayList<org.apache.cassandra.avro.Column> columns = new ArrayList<org.apache.cassandra.avro.Column>();
+                   for (Tuple subcol : (DefaultDataBag) pair.get(1))
+                   {
+                       org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column();
+                       column.name = objToBB(subcol.get(0));
+                       column.value = objToBB(subcol.get(1));
+                       column.timestamp = System.currentTimeMillis() * 1000;
+                       columns.add(column);
+                   }
+                   if (columns.isEmpty()) // a deletion
+                   {
+                       mutation.deletion = new Deletion();
+                       mutation.deletion.super_column = objToBB(pair.get(0));
+                       mutation.deletion.timestamp = System.currentTimeMillis() * 1000;
+                   }
+                   else
+                   {
+                       sc.columns = columns;
+                       mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+                       mutation.column_or_supercolumn.super_column = sc;
+                   }
+               }
+               else // assume column since it could be anything else
+               {
+                   if (pair.get(1) == null)
+                   {
+                       mutation.deletion = new Deletion();
+                       mutation.deletion.predicate = new org.apache.cassandra.avro.SlicePredicate();
+                       mutation.deletion.predicate.column_names = Arrays.asList(objToBB(pair.get(0)));
+                       mutation.deletion.timestamp = System.currentTimeMillis() * 1000;
+                   }
+                   else
+                   {
+                       org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column();
+                       column.name = objToBB(pair.get(0));
+                       column.value = objToBB(pair.get(1));
+                       column.timestamp = System.currentTimeMillis() * 1000;
+                       mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+                       mutation.column_or_supercolumn.column = column;
+                       mutationList.add(mutation);
+                   }
+               }
+               mutationList.add(mutation);
+            }
+        }
+        catch (ClassCastException e)
+        {
+            throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily");
+        }
+        try
+        {
+            writer.write(key, mutationList);
+        }
+        catch (InterruptedException e)
+        {
+           throw new IOException(e);
+        }
+    }
+
+    public void cleanupOnFailure(String failure, Job job)
+    {
+    }
+
+    /* LoadPushDown methods */
+
+    public List<OperatorSet> getFeatures() {
+        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+    }
+
+    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException
+    {
+        return new RequiredFieldResponse(true);
+    }
+
 }

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1078063
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1078063
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1078063
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1078063
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 17:46:20 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1076866
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1078063
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Mar  4 17:46:20 2011
@@ -66,6 +66,8 @@ public class Config
     public String rpc_address;
     public Integer rpc_port = 9160;
     public Boolean rpc_keepalive = true;
+    public Integer rpc_min_threads = 16;
+    public Integer rpc_max_threads = Integer.MAX_VALUE;
     public Integer rpc_send_buff_size_in_bytes;
     public Integer rpc_recv_buff_size_in_bytes;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Mar  4 17:46:20 2011
@@ -1049,6 +1049,16 @@ public class DatabaseDescriptor
         return conf.rpc_keepalive;
     }
 
+    public static Integer getRpcMinThreads()
+    {
+        return conf.rpc_min_threads;
+    }
+    
+    public static Integer getRpcMaxThreads()
+    {
+        return conf.rpc_max_threads;
+    }
+    
     public static Integer getRpcSendBufferSize()
     {
         return conf.rpc_send_buff_size_in_bytes;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Fri Mar  4 17:46:20 2011
@@ -221,25 +221,23 @@ public class ColumnFamily implements ICo
     public void addColumn(IColumn column)
     {
         ByteBuffer name = column.name();
-        IColumn oldColumn = columns.putIfAbsent(name, column);
-        if (oldColumn != null)
+        IColumn oldColumn;
+        while ((oldColumn = columns.putIfAbsent(name, column)) != null)
         {
             if (oldColumn instanceof SuperColumn)
             {
                 ((SuperColumn) oldColumn).putColumn(column);
+                break;  // Delegated to SuperColumn
             }
             else
             {
                 // calculate reconciled col from old (existing) col and new col
                 IColumn reconciledColumn = column.reconcile(oldColumn);
-                while (!columns.replace(name, oldColumn, reconciledColumn))
-                {
-                    // if unable to replace, then get updated old (existing) col
-                    oldColumn = columns.get(name);
-                    // re-calculate reconciled col from updated old col and original new col
-                    reconciledColumn = column.reconcile(oldColumn);
-                    // try to re-update value, again
-                }
+                if (columns.replace(name, oldColumn, reconciledColumn))
+                    break;
+
+                // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+                // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
             }
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Mar  4 17:46:20 2011
@@ -167,19 +167,16 @@ public class SuperColumn implements ICol
         assert column instanceof Column : "A super column can only contain simple columns";
 
         ByteBuffer name = column.name();
-        IColumn oldColumn = columns_.putIfAbsent(name, column);
-        if (oldColumn != null)
+        IColumn oldColumn;
+        while ((oldColumn = columns_.putIfAbsent(name, column)) != null)
         {
             IColumn reconciledColumn = column.reconcile(oldColumn);
-            while (!columns_.replace(name, oldColumn, reconciledColumn))
-            {
-                // if unable to replace, then get updated old (existing) col
-                oldColumn = columns_.get(name);
-                // re-calculate reconciled col from updated old col and original new col
-                reconciledColumn = column.reconcile(oldColumn);
-                // try to re-update value, again
-            }
-    	}
+            if (columns_.replace(name, oldColumn, reconciledColumn))
+                break;
+
+            // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying.
+            // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.)
+        }
     }
 
     /*

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Mar  4 17:46:20 2011
@@ -301,7 +301,7 @@ public class SSTableReader extends SSTab
                 long dataPosition = input.readLong();
                 if (key != null)
                 {
-                    DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, ByteBufferUtil.clone(key));
+                    DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, key);
                     if (recreatebloom)
                         bf.add(decoratedKey.key);
                     if (shouldAddEntry)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Mar  4 17:46:20 2011
@@ -82,8 +82,6 @@ public abstract class AbstractCassandraD
     protected int listenPort;
     protected volatile boolean isRunning = false;
     
-    public static final int MIN_WORKER_THREADS = 64;
-
     /**
      * This is a hook for concrete daemons to initialize themselves suitably.
      *

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Fri Mar  4 17:46:20 2011
@@ -133,7 +133,8 @@ public class CassandraDaemon extends org
 
             // ThreadPool Server
             CustomTThreadPoolServer.Options options = new CustomTThreadPoolServer.Options();
-            options.minWorkerThreads = MIN_WORKER_THREADS;
+            options.minWorkerThreads = DatabaseDescriptor.getRpcMinThreads();
+            options.maxWorkerThreads = DatabaseDescriptor.getRpcMaxThreads();
 
             ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
                     options.minWorkerThreads,

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Fri Mar  4 17:46:20 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,10 +32,7 @@ import org.apache.thrift.TProcessorFacto
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.*;
 
 
 /**
@@ -59,6 +57,9 @@ public class CustomTThreadPoolServer ext
     // Server options
     private Options options_;
 
+    //Track and Limit the number of connected clients
+    private final AtomicInteger activeClients = new AtomicInteger(0);
+    
     // Customizable server options
     public static class Options
     {
@@ -101,10 +102,24 @@ public class CustomTThreadPoolServer ext
         stopped_ = false;
         while (!stopped_)
         {
+            // block until we are under max clients
+            while (activeClients.get() >= options_.maxWorkerThreads)
+            {
+                try
+                {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+
             int failureCount = 0;
             try
             {
                 TTransport client = serverTransport_.accept();
+                activeClients.incrementAndGet();
                 WorkerProcess wp = new WorkerProcess(client);
                 executorService_.execute(wp);
             }
@@ -116,6 +131,9 @@ public class CustomTThreadPoolServer ext
                     LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
                 }
             }
+
+            if (activeClients.get() >= options_.maxWorkerThreads)
+                LOGGER.warn("Maximum number of clients " + options_.maxWorkerThreads + " reached");
         }
 
         executorService_.shutdown();
@@ -203,6 +221,10 @@ public class CustomTThreadPoolServer ext
             {
                 LOGGER.error("Error occurred during processing of message.", x);
             }
+            finally
+            {
+                activeClients.decrementAndGet();
+            }
 
             if (inputTransport != null)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Fri Mar  4 17:46:20 2011
@@ -188,23 +188,28 @@ public class ByteBufferUtil
            throw new RuntimeException(e);
         } 
     }
-    
-    public static ByteBuffer clone(ByteBuffer o)
+
+    /**
+     * @return a new copy of the data in @param buffer
+     * USUALLY YOU SHOULD USE ByteBuffer.duplicate() INSTEAD, which creates a new Buffer
+     * (so you can mutate its position without affecting the original) without copying the underlying array.
+     */
+    public static ByteBuffer clone(ByteBuffer buffer)
     {
-        assert o != null;
+        assert buffer != null;
         
-        if (o.remaining() == 0)
+        if (buffer.remaining() == 0)
             return EMPTY_BYTE_BUFFER;
           
-        ByteBuffer clone = ByteBuffer.allocate(o.remaining());
+        ByteBuffer clone = ByteBuffer.allocate(buffer.remaining());
 
-        if (o.hasArray())
+        if (buffer.hasArray())
         {
-            System.arraycopy(o.array(), o.arrayOffset() + o.position(), clone.array(), 0, o.remaining());
+            System.arraycopy(buffer.array(), buffer.arrayOffset() + buffer.position(), clone.array(), 0, buffer.remaining());
         }
         else
         {
-            clone.put(o.duplicate());
+            clone.put(buffer.duplicate());
             clone.flip();
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1078069&r1=1078068&r2=1078069&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Fri Mar  4 17:46:20 2011
@@ -289,7 +289,7 @@ public class FBUtilities
         MessageDigest messageDigest = localMD5Digest.get();
         for(ByteBuffer block : data)
         {
-            messageDigest.update(ByteBufferUtil.clone(block));
+            messageDigest.update(block.duplicate());
         }
 
         return messageDigest.digest();