You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/01/31 22:41:31 UTC
git commit: (Pig) fix CassandraStorage to use correct comparator in
Super ColumnFamily case patch by Pavel Yaskevich;
reviewed by Brandon Williams for CASSANDRA-3251
Updated Branches:
refs/heads/cassandra-0.8 9ecaa2a5c -> dbd8ced10
(Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily case
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-3251
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbd8ced1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbd8ced1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbd8ced1
Branch: refs/heads/cassandra-0.8
Commit: dbd8ced107fad720c416d7e4919cb33884378a02
Parents: 9ecaa2a
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Jan 31 23:24:27 2012 +0200
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Wed Feb 1 00:39:37 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../cassandra/hadoop/pig/CassandraStorage.java | 45 ++++++++++-----
2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd8ced1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64b2747..a6a724f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,7 +3,8 @@
* use correct list of replicas for LOCAL_QUORUM reads when read repair
is disabled (CASSANDRA-3696)
* block on flush before compacting hints (may prevent OOM) (CASSANDRA-3733)
-
+ * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily
+ case (CASSANDRA-3251)
0.8.9
* avoid logging (harmless) exception when GC takes < 1ms (CASSANDRA-3656)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd8ced1/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index fd0959d..a0dec20 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -128,7 +128,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
{
- columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
+ columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.set(1, new DefaultDataBag(columns));
@@ -140,29 +140,31 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
}
}
- private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
+ private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- setTupleValue(pair, 0, marshallers.get(0).compose(name));
+ setTupleValue(pair, 0, comparator.compose(col.name()));
if (col instanceof Column)
{
// standard
- if (validators.get(name) == null)
+ if (validators.get(col.name()) == null)
setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
else
- setTupleValue(pair, 1, validators.get(name).compose(col.value()));
+ setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
return pair;
}
+ else
+ {
+ // super
+ ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+ for (IColumn subcol : col.getSubColumns())
+ subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
- // super
- ArrayList<Tuple> subcols = new ArrayList<Tuple>();
- for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
-
- pair.set(1, new DefaultDataBag(subcols));
+ pair.set(1, new DefaultDataBag(subcols));
+ }
return pair;
}
@@ -188,12 +190,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
{
ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
- AbstractType comparator = null;
- AbstractType default_validator = null;
- AbstractType key_validator = null;
+ AbstractType comparator;
+ AbstractType subcomparator;
+ AbstractType default_validator;
+ AbstractType key_validator;
try
{
comparator = TypeParser.parse(cfDef.getComparator_type());
+ subcomparator = TypeParser.parse(cfDef.getSubcomparator_type());
default_validator = TypeParser.parse(cfDef.getDefault_validation_class());
key_validator = TypeParser.parse(cfDef.getKey_validation_class());
}
@@ -205,6 +209,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
marshallers.add(comparator);
marshallers.add(default_validator);
marshallers.add(key_validator);
+ marshallers.add(subcomparator);
return marshallers;
}
@@ -230,6 +235,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
return validators;
}
+ private AbstractType parseType(String type) throws IOException
+ {
+ try
+ {
+ return TypeParser.parse(type);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
@Override
public InputFormat getInputFormat()
{