You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/08 01:24:45 UTC

svn commit: r1090062 - in /cassandra/trunk: ./ conf/ contrib/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/

Author: jbellis
Date: Thu Apr  7 23:24:44 2011
New Revision: 1090062

URL: http://svn.apache.org/viewvc?rev=1090062&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/conf/cassandra-env.sh
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  7 23:24:44 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7:1026516-1089929,1089976
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Thu Apr  7 23:24:44 2011
@@ -130,7 +130,7 @@ JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatin
 # JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram"
 # JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
 # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
-# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc.log"
+# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log"
 
 # uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414
 # JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414"

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  7 23:24:44 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/contrib:1026516-1089929,1089976
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Thu Apr  7 23:24:44 2011
@@ -68,7 +68,7 @@ public class CassandraStorage extends Lo
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 
-    private static String UDFCONTEXT_SCHEMA_KEY = "schema";
+    private static String UDFCONTEXT_SCHEMA_KEY = "cassandra.schema";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -168,7 +168,7 @@ public class CassandraStorage extends Lo
     private CfDef getCfDef()
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(ResourceSchema.class);
+        Properties property = context.getUDFProperties(CassandraStorage.class);
         return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY));
     }
 
@@ -314,6 +314,7 @@ public class CassandraStorage extends Lo
         setLocationFromUri(location);
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
+        initSchema();
     }
 
     public OutputFormat getOutputFormat()
@@ -443,41 +444,46 @@ public class CassandraStorage extends Lo
 
     private void initSchema()
     {
-        Cassandra.Client client = null;
-        try
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        
+        // Only get the schema if we haven't already gotten it
+        if (!property.containsKey(UDFCONTEXT_SCHEMA_KEY))
         {
-            client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
-            CfDef cfDef = null;
-            client.set_keyspace(keyspace);
-            KsDef ksDef = client.describe_keyspace(keyspace);
-            List<CfDef> defs = ksDef.getCf_defs();
-            for (CfDef def : defs)
+            Cassandra.Client client = null;
+            try
             {
-                if (column_family.equalsIgnoreCase(def.getName()))
+                client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+                CfDef cfDef = null;
+                client.set_keyspace(keyspace);
+                KsDef ksDef = client.describe_keyspace(keyspace);
+                List<CfDef> defs = ksDef.getCf_defs();
+                for (CfDef def : defs)
                 {
-                    cfDef = def;
-                    break;
+                    if (column_family.equalsIgnoreCase(def.getName()))
+                    {
+                        cfDef = def;
+                        break;
+                    }
                 }
+                property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef));
+            }
+            catch (TException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (InvalidRequestException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (NotFoundException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
             }
-            UDFContext context = UDFContext.getUDFContext();
-            Properties property = context.getUDFProperties(ResourceSchema.class);
-            property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (NotFoundException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
         }
     }
 

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  7 23:24:44 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1089929,1089976
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  7 23:24:44 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1089929,1089976
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  7 23:24:44 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1089929,1089976
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  7 23:24:44 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1089929,1089976
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr  7 23:24:44 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1089929,1089976
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Apr  7 23:24:44 2011
@@ -24,6 +24,8 @@ import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import org.apache.commons.collections.iterators.CollatingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +42,7 @@ import org.apache.cassandra.utils.Reduci
  * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
  * to the most recent ColumnFamily and setting up read repairs as necessary.
  */
-public class RangeSliceResponseResolver implements IResponseResolver<List<Row>>
+public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Row>>
 {
     private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
     private final String table;
@@ -62,7 +64,7 @@ public class RangeSliceResponseResolver 
 
     // Note: this deserializes the response a 2nd time if getData was called first
     // (this is not currently an issue since we don't do read repair for range queries.)
-    public List<Row> resolve() throws IOException
+    public Iterable<Row> resolve() throws IOException
     {
         CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
         {
@@ -81,7 +83,8 @@ public class RangeSliceResponseResolver 
         }
 
         // for each row, compute the combination of all different versions seen, and repair incomplete versions
-        ReducingIterator<Pair<Row,InetAddress>, Row> iter = new ReducingIterator<Pair<Row,InetAddress>, Row>(collator)
+
+        return new ReducingIterator<Pair<Row,InetAddress>, Row>(collator)
         {
             List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
             List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size());
@@ -109,12 +112,6 @@ public class RangeSliceResponseResolver 
                 return new Row(key, resolved);
             }
         };
-
-        List<Row> resolvedRows = new ArrayList<Row>(n);
-        while (iter.hasNext())
-            resolvedRows.add(iter.next());
-
-        return resolvedRows;
     }
 
     public void preprocess(Message message)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Apr  7 23:24:44 2011
@@ -691,7 +691,7 @@ public class StorageProxy implements Sto
 
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-                    ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+                    ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
                     handler.assureSufficientLiveNodes();
                     for (InetAddress endpoint : liveEndpoints)
                     {
@@ -700,17 +700,13 @@ public class StorageProxy implements Sto
                             logger.debug("reading " + c2 + " from " + endpoint);
                     }
 
-                    // if we're done, great, otherwise, move to the next range
                     try
                     {
-                        if (logger.isDebugEnabled())
+                        for (Row row : handler.get())
                         {
-                            for (Row row : handler.get())
-                            {
-                                logger.debug("range slices read " + row.key);
-                            }
+                            rows.add(row);
+                            logger.debug("range slices read {}", row.key);
                         }
-                        rows.addAll(handler.get());
                     }
                     catch (DigestMismatchException e)
                     {
@@ -718,6 +714,7 @@ public class StorageProxy implements Sto
                     }
                 }
 
+                // if we're done, great, otherwise, move to the next range
                 if (rows.size() >= command.max_keys)
                     break;
             }
@@ -976,7 +973,7 @@ public class StorageProxy implements Sto
                     return keyspace;
                 }
             };
-            ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
+            ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
             handler.assureSufficientLiveNodes();
 
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
@@ -988,21 +985,18 @@ public class StorageProxy implements Sto
                     logger.debug("reading " + command + " from " + endpoint);
             }
 
-            List<Row> theseRows;
             try
             {
-                theseRows = handler.get();
+                for (Row row : handler.get())
+                {
+                    rows.add(row);
+                    logger.debug("read {}", row);
+                }
             }
             catch (DigestMismatchException e)
             {
                 throw new RuntimeException(e);
             }
-            rows.addAll(theseRows);
-            if (logger.isDebugEnabled())
-            {
-                for (Row row : theseRows)
-                    logger.debug("read " + row);
-            }
             if (rows.size() >= index_clause.count)
                 return rows.subList(0, index_clause.count);
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Apr  7 23:24:44 2011
@@ -602,7 +602,7 @@ public class FBUtilities
         T rval = null;
         try
         {
-            rval = (T) cls.getDeclaredMethod("getInstance").invoke(null, (Object) null);
+            rval = (T) cls.getDeclaredMethod("getInstance").invoke(new Object[] {null, null});
 
         }
         catch (NoSuchMethodException e)