You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/08 01:24:45 UTC
svn commit: r1090062 - in /cassandra/trunk: ./ conf/ contrib/
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/
Author: jbellis
Date: Thu Apr 7 23:24:44 2011
New Revision: 1090062
URL: http://svn.apache.org/viewvc?rev=1090062&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/conf/cassandra-env.sh
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 7 23:24:44 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7:1026516-1089929,1089976
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Thu Apr 7 23:24:44 2011
@@ -130,7 +130,7 @@ JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatin
# JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram"
# JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
-# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc.log"
+# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log"
# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414
# JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414"
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 7 23:24:44 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/contrib:1026516-1089929,1089976
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Thu Apr 7 23:24:44 2011
@@ -68,7 +68,7 @@ public class CassandraStorage extends Lo
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
- private static String UDFCONTEXT_SCHEMA_KEY = "schema";
+ private static String UDFCONTEXT_SCHEMA_KEY = "cassandra.schema";
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -168,7 +168,7 @@ public class CassandraStorage extends Lo
private CfDef getCfDef()
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(ResourceSchema.class);
+ Properties property = context.getUDFProperties(CassandraStorage.class);
return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY));
}
@@ -314,6 +314,7 @@ public class CassandraStorage extends Lo
setLocationFromUri(location);
ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
+ initSchema();
}
public OutputFormat getOutputFormat()
@@ -443,41 +444,46 @@ public class CassandraStorage extends Lo
private void initSchema()
{
- Cassandra.Client client = null;
- try
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CassandraStorage.class);
+
+ // Only get the schema if we haven't already gotten it
+ if (!property.containsKey(UDFCONTEXT_SCHEMA_KEY))
{
- client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
- CfDef cfDef = null;
- client.set_keyspace(keyspace);
- KsDef ksDef = client.describe_keyspace(keyspace);
- List<CfDef> defs = ksDef.getCf_defs();
- for (CfDef def : defs)
+ Cassandra.Client client = null;
+ try
{
- if (column_family.equalsIgnoreCase(def.getName()))
+ client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+ CfDef cfDef = null;
+ client.set_keyspace(keyspace);
+ KsDef ksDef = client.describe_keyspace(keyspace);
+ List<CfDef> defs = ksDef.getCf_defs();
+ for (CfDef def : defs)
{
- cfDef = def;
- break;
+ if (column_family.equalsIgnoreCase(def.getName()))
+ {
+ cfDef = def;
+ break;
+ }
}
+ property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (NotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(ResourceSchema.class);
- property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef));
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
- catch (NotFoundException e)
- {
- throw new RuntimeException(e);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
}
}
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 7 23:24:44 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1089929,1089976
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 7 23:24:44 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1089929,1089976
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 7 23:24:44 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1089929,1089976
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 7 23:24:44 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1089929,1089976
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 7 23:24:44 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1088805,1089079,1089139,1089976
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1089929,1089976
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Apr 7 23:24:44 2011
@@ -24,6 +24,8 @@ import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import org.apache.commons.collections.iterators.CollatingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +42,7 @@ import org.apache.cassandra.utils.Reduci
* Turns RangeSliceReply objects into row (string -> CF) maps, resolving
* to the most recent ColumnFamily and setting up read repairs as necessary.
*/
-public class RangeSliceResponseResolver implements IResponseResolver<List<Row>>
+public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Row>>
{
private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
private final String table;
@@ -62,7 +64,7 @@ public class RangeSliceResponseResolver
// Note: this deserializes the response a 2nd time if getData was called first
// (this is not currently an issue since we don't do read repair for range queries.)
- public List<Row> resolve() throws IOException
+ public Iterable<Row> resolve() throws IOException
{
CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
{
@@ -81,7 +83,8 @@ public class RangeSliceResponseResolver
}
// for each row, compute the combination of all different versions seen, and repair incomplete versions
- ReducingIterator<Pair<Row,InetAddress>, Row> iter = new ReducingIterator<Pair<Row,InetAddress>, Row>(collator)
+
+ return new ReducingIterator<Pair<Row,InetAddress>, Row>(collator)
{
List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size());
@@ -109,12 +112,6 @@ public class RangeSliceResponseResolver
return new Row(key, resolved);
}
};
-
- List<Row> resolvedRows = new ArrayList<Row>(n);
- while (iter.hasNext())
- resolvedRows.add(iter.next());
-
- return resolvedRows;
}
public void preprocess(Message message)
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Apr 7 23:24:44 2011
@@ -691,7 +691,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+ ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
handler.assureSufficientLiveNodes();
for (InetAddress endpoint : liveEndpoints)
{
@@ -700,17 +700,13 @@ public class StorageProxy implements Sto
logger.debug("reading " + c2 + " from " + endpoint);
}
- // if we're done, great, otherwise, move to the next range
try
{
- if (logger.isDebugEnabled())
+ for (Row row : handler.get())
{
- for (Row row : handler.get())
- {
- logger.debug("range slices read " + row.key);
- }
+ rows.add(row);
+ logger.debug("range slices read {}", row.key);
}
- rows.addAll(handler.get());
}
catch (DigestMismatchException e)
{
@@ -718,6 +714,7 @@ public class StorageProxy implements Sto
}
}
+ // if we're done, great, otherwise, move to the next range
if (rows.size() >= command.max_keys)
break;
}
@@ -976,7 +973,7 @@ public class StorageProxy implements Sto
return keyspace;
}
};
- ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
+ ReadCallback<Iterable<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
handler.assureSufficientLiveNodes();
IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
@@ -988,21 +985,18 @@ public class StorageProxy implements Sto
logger.debug("reading " + command + " from " + endpoint);
}
- List<Row> theseRows;
try
{
- theseRows = handler.get();
+ for (Row row : handler.get())
+ {
+ rows.add(row);
+ logger.debug("read {}", row);
+ }
}
catch (DigestMismatchException e)
{
throw new RuntimeException(e);
}
- rows.addAll(theseRows);
- if (logger.isDebugEnabled())
- {
- for (Row row : theseRows)
- logger.debug("read " + row);
- }
if (rows.size() >= index_clause.count)
return rows.subList(0, index_clause.count);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1090062&r1=1090061&r2=1090062&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Apr 7 23:24:44 2011
@@ -602,7 +602,7 @@ public class FBUtilities
T rval = null;
try
{
- rval = (T) cls.getDeclaredMethod("getInstance").invoke(null, (Object) null);
+ rval = (T) cls.getDeclaredMethod("getInstance").invoke(new Object[] {null, null});
}
catch (NoSuchMethodException e)