You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/02/01 20:27:07 UTC
[10/16] git commit: (Pig) fix CassandraStorage to use correct
comparator in Super ColumnFamily case patch by Pavel Yaskevich;
reviewed by Brandon Williams for CASSANDRA-3251
(Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily case
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-3251
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6173fa9e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6173fa9e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6173fa9e
Branch: refs/heads/cassandra-1.1
Commit: 6173fa9e3443909474e14f5695e705df5b10dbcc
Parents: f4064b5
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Jan 31 23:24:27 2012 +0200
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Wed Feb 1 00:48:26 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../cassandra/hadoop/pig/CassandraStorage.java | 45 ++++++++++-----
2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6173fa9e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d32eba8..468dbcf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,7 +54,8 @@ Merged from 0.8:
* use correct list of replicas for LOCAL_QUORUM reads when read repair
is disabled (CASSANDRA-3696)
* block on flush before compacting hints (may prevent OOM) (CASSANDRA-3733)
-
+ * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily
+ case (CASSANDRA-3251)
1.0.6
* (CQL) fix cqlsh support for replicate_on_write (CASSANDRA-3596)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6173fa9e/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 4513783..1a594ce 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -128,7 +128,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
{
- columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
+ columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.set(1, new DefaultDataBag(columns));
@@ -140,29 +140,31 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
}
- private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
+ private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- setTupleValue(pair, 0, marshallers.get(0).compose(name));
+ setTupleValue(pair, 0, comparator.compose(col.name()));
if (col instanceof Column)
{
// standard
- if (validators.get(name) == null)
+ if (validators.get(col.name()) == null)
setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
else
- setTupleValue(pair, 1, validators.get(name).compose(col.value()));
+ setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
return pair;
}
+ else
+ {
+ // super
+ ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+ for (IColumn subcol : col.getSubColumns())
+ subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
- // super
- ArrayList<Tuple> subcols = new ArrayList<Tuple>();
- for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
-
- pair.set(1, new DefaultDataBag(subcols));
+ pair.set(1, new DefaultDataBag(subcols));
+ }
return pair;
}
@@ -188,12 +190,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
{
ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
- AbstractType comparator = null;
- AbstractType default_validator = null;
- AbstractType key_validator = null;
+ AbstractType comparator;
+ AbstractType subcomparator;
+ AbstractType default_validator;
+ AbstractType key_validator;
try
{
comparator = TypeParser.parse(cfDef.getComparator_type());
+ subcomparator = TypeParser.parse(cfDef.getSubcomparator_type());
default_validator = TypeParser.parse(cfDef.getDefault_validation_class());
key_validator = TypeParser.parse(cfDef.getKey_validation_class());
}
@@ -205,6 +209,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
marshallers.add(comparator);
marshallers.add(default_validator);
marshallers.add(key_validator);
+ marshallers.add(subcomparator);
return marshallers;
}
@@ -230,6 +235,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return validators;
}
+ private AbstractType parseType(String type) throws IOException
+ {
+ try
+ {
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
@Override
public InputFormat getInputFormat()
{