You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/10/12 00:43:23 UTC
[1/5] git commit: Fix int/bigint in CassandraStorage Patch by Alex
Liu, reviewed by brandonwilliams for CASSANDRA-6102
Updated Branches:
refs/heads/cassandra-2.0 a3ad2e822 -> e5dba3c62
refs/heads/trunk bc8e2475f -> b89cce9c8
Fix int/bigint in CassandraStorage
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6102
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639c01a3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639c01a3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639c01a3
Branch: refs/heads/trunk
Commit: 639c01a3504ba2e2a55061093651a9973ad68d11
Parents: eee485e
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 15:22:35 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 15:22:35 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 82 +++++++++++++-------
.../cassandra/hadoop/pig/CassandraStorage.java | 45 ++++++-----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 8 +-
3 files changed, 85 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639c01a3/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 6ad4f9e..dbebfb5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -97,7 +97,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected String outputFormatClass;
protected int splitSize = 64 * 1024;
protected String partitionerClass;
- protected boolean usePartitionFilter = false;
+ protected boolean usePartitionFilter = false;
public AbstractCassandraStorage()
{
@@ -116,8 +116,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
/** convert a column to a tuple */
- protected Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
+ protected Tuple columnToTuple(IColumn col, CfInfo cfInfo, AbstractType comparator) throws IOException
{
+ CfDef cfDef = cfInfo.cfDef;
Tuple pair = TupleFactory.getInstance().newTuple(2);
// name
@@ -131,13 +132,21 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
{
// standard
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- if (validators.get(col.name()) == null)
+ ByteBuffer colName;
+ if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
+ {
+ ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
+ colName = names[names.length-1];
+ }
+ else
+ colName = col.name();
+ if (validators.get(colName) == null)
{
Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
}
else
- setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
+ setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
return pair;
}
else
@@ -145,7 +154,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
// super
ArrayList<Tuple> subcols = new ArrayList<Tuple>();
for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
+ subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
pair.set(1, new DefaultDataBag(subcols));
}
@@ -168,11 +177,16 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
/** get the columnfamily definition for the signature */
- protected CfDef getCfDef(String signature) throws IOException
+ protected CfInfo getCfInfo(String signature) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- return cfdefFromString(property.getProperty(signature));
+ String prop = property.getProperty(signature);
+ CfInfo cfInfo = new CfInfo();
+ cfInfo.cfDef = cfdefFromString(prop.substring(2));
+ cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
+ cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
+ return cfInfo;
}
/** construct a map to store the mashaller type to cassandra data type mapping */
@@ -329,10 +343,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return DataType.INTEGER;
else if (type instanceof AsciiType ||
type instanceof UTF8Type ||
- type instanceof DecimalType ||
- type instanceof InetAddressType ||
- type instanceof LexicalUUIDType ||
- type instanceof UUIDType )
+ type instanceof DecimalType || type instanceof InetAddressType )
return DataType.CHARARRAY;
else if (type instanceof FloatType)
return DataType.FLOAT;
@@ -513,11 +524,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
}
- // compose the CfDef for the columfamily
- CfDef cfDef = getCfDef(client);
+ // compose the CfInfo for the columfamily
+ CfInfo cfInfo = getCfInfo(client);
- if (cfDef != null)
- properties.setProperty(signature, cfdefToString(cfDef));
+ if (cfInfo.cfDef != null)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
+ properties.setProperty(signature, sb.toString());
+ }
else
throw new IOException(String.format("Column family '%s' not found in keyspace '%s'",
column_family,
@@ -563,17 +578,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return cfDef;
}
- /** return the CfDef for the column family */
- protected CfDef getCfDef(Cassandra.Client client)
+ /** return the CfInf for the column family */
+ protected CfInfo getCfInfo(Cassandra.Client client)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
SchemaDisagreementException,
TException,
- CharacterCodingException,
NotFoundException,
org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException
+ ConfigurationException,
+ IOException
{
// get CF meta data
String query = "SELECT type," +
@@ -627,12 +642,19 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
else
cql3Table = true;
}
- cfDef.column_metadata = getColumnMetadata(client, cql3Table);
- return cfDef;
+ cfDef.column_metadata = getColumnMetadata(client);
+ CfInfo cfInfo = new CfInfo();
+ cfInfo.cfDef = cfDef;
+ if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
+ cfInfo.compactCqlTable = true;
+
+ if (cql3Table)
+ cfInfo.cql3Table = true;
+ return cfInfo;
}
/** get a list of columns */
- protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
@@ -749,7 +771,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
/** get a list of columns with defined index*/
protected List<ColumnDef> getIndexes() throws IOException
{
- CfDef cfdef = getCfDef(loadSignature);
+ CfDef cfdef = getCfInfo(loadSignature).cfDef;
List<ColumnDef> indexes = new ArrayList<ColumnDef>();
for (ColumnDef cdef : cfdef.column_metadata)
{
@@ -778,15 +800,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
{
- if (validator instanceof DecimalType ||
- validator instanceof InetAddressType ||
- validator instanceof LexicalUUIDType ||
- validator instanceof UUIDType)
- {
+ if (validator instanceof DecimalType || validator instanceof InetAddressType)
return validator.getString(value);
- }
else
return validator.compose(value);
}
+
+ protected class CfInfo
+ {
+ boolean compactCqlTable = false;
+ boolean cql3Table = false;
+ CfDef cfDef;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639c01a3/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 1135b70..a7cc1ad 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -101,7 +101,8 @@ public class CassandraStorage extends AbstractCassandraStorage
/** read wide row*/
public Tuple getNextWide() throws IOException
{
- CfDef cfDef = getCfDef(loadSignature);
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
ByteBuffer key = null;
Tuple tuple = null;
DefaultDataBag bag = new DefaultDataBag();
@@ -124,7 +125,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
@@ -162,7 +163,7 @@ public class CassandraStorage extends AbstractCassandraStorage
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
@@ -179,14 +180,14 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
}
}
@@ -208,7 +209,8 @@ public class CassandraStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfDef cfDef = getCfDef(loadSignature);
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
ByteBuffer key = reader.getCurrentKey();
Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
assert key != null && cf != null;
@@ -224,11 +226,21 @@ public class CassandraStorage extends AbstractCassandraStorage
// take care to iterate these in the same order as the schema does
for (ColumnDef cdef : cfDef.column_metadata)
{
- if (cf.containsKey(cdef.name))
+ boolean hasColumn = false;
+ boolean cql3Table = false;
+ try
{
- tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
+ hasColumn = cf.containsKey(cdef.name);
}
- else
+ catch (Exception e)
+ {
+ cql3Table = true;
+ }
+ if (hasColumn)
+ {
+ tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
+ }
+ else if (!cql3Table)
{ // otherwise, we need to add an empty tuple to take its place
tuple.append(TupleFactory.getInstance().newTuple());
}
@@ -238,7 +250,7 @@ public class CassandraStorage extends AbstractCassandraStorage
for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
// finally, special top-level indexes if needed
@@ -246,7 +258,7 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (ColumnDef cdef : getIndexes())
{
- Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
+ Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
tuple.append(throwaway.get(1));
}
}
@@ -356,7 +368,8 @@ public class CassandraStorage extends AbstractCassandraStorage
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfDef cfDef = getCfDef(loadSignature);
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
if (cfDef.column_type.equals("Super"))
return null;
@@ -403,7 +416,7 @@ public class CassandraStorage extends AbstractCassandraStorage
// add the key first, then the indexed columns, and finally the bag
allSchemaFields.add(keyFieldSchema);
- if (!widerows)
+ if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
{
// defined validators/indexes
for (ColumnDef cdef : cfDef.column_metadata)
@@ -697,7 +710,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
/** get a list of column for the column family */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
@@ -708,9 +721,6 @@ public class CassandraStorage extends AbstractCassandraStorage
ConfigurationException,
NotFoundException
{
- if (cql3Table)
- return new ArrayList<ColumnDef>();
-
return getColumnMeta(client, true, true);
}
@@ -795,6 +805,5 @@ public class CassandraStorage extends AbstractCassandraStorage
"[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
}
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639c01a3/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index e51338c..7e1f56c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -95,7 +95,8 @@ public class CqlStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfDef cfDef = getCfDef(loadSignature);
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
Map<String, ByteBuffer> keys = reader.getCurrentKey();
Map<String, ByteBuffer> columns = reader.getCurrentValue();
assert keys != null && columns != null;
@@ -281,7 +282,8 @@ public class CqlStorage extends AbstractCassandraStorage
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfDef cfDef = getCfDef(loadSignature);
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
@@ -429,7 +431,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
/** include key columns */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
[5/5] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b89cce9c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b89cce9c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b89cce9c
Branch: refs/heads/trunk
Commit: b89cce9c8583d869cb3ca6fd26b4a9b47d71f108
Parents: bc8e247 e5dba3c
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 17:41:45 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 17:41:45 2013 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[4/5] git commit: Revert "Merge branch 'cassandra-1.2' into
cassandra-2.0"
Posted by br...@apache.org.
Revert "Merge branch 'cassandra-1.2' into cassandra-2.0"
This reverts commit a3ad2e82249b88d4a05f24140948cdbc809d14f3, reversing
changes made to 8a506e66a66c004a7a253e3dd28845517da8a967.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5dba3c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5dba3c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5dba3c6
Branch: refs/heads/cassandra-2.0
Commit: e5dba3c6250eb7285777490a937b66a7092afa77
Parents: a3ad2e8
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 17:41:22 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 17:41:22 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 97 ++++++--------------
.../cassandra/hadoop/pig/CassandraStorage.java | 55 ++++-------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 8 +-
3 files changed, 51 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 486c781..c881734 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -97,7 +97,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected String outputFormatClass;
protected int splitSize = 64 * 1024;
protected String partitionerClass;
- protected boolean usePartitionFilter = false;
+ protected boolean usePartitionFilter = false;
public AbstractCassandraStorage()
{
@@ -116,9 +116,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
/** convert a column to a tuple */
- protected Tuple columnToTuple(IColumn col, CfInfo cfInfo, AbstractType comparator) throws IOException
+ protected Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws IOException
{
- CfDef cfDef = cfInfo.cfDef;
Tuple pair = TupleFactory.getInstance().newTuple(2);
// name
@@ -131,34 +130,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
if (validators.get(col.name()) == null)
{
- // standard
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- ByteBuffer colName;
- if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
- {
- ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
- colName = names[names.length-1];
- }
- else
- colName = col.name();
- if (validators.get(colName) == null)
- {
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
- }
- else
- setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
- return pair;
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
}
else
- {
- // super
- ArrayList<Tuple> subcols = new ArrayList<Tuple>();
- for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
-
- pair.set(1, new DefaultDataBag(subcols));
- }
+ setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
return pair;
}
@@ -178,16 +154,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
/** get the columnfamily definition for the signature */
- protected CfInfo getCfInfo(String signature) throws IOException
+ protected CfDef getCfDef(String signature) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- String prop = property.getProperty(signature);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfdefFromString(prop.substring(2));
- cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
- cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
- return cfInfo;
+ return cfdefFromString(property.getProperty(signature));
}
/** construct a map to store the mashaller type to cassandra data type mapping */
@@ -344,7 +315,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return DataType.INTEGER;
else if (type instanceof AsciiType ||
type instanceof UTF8Type ||
- type instanceof DecimalType || type instanceof InetAddressType )
+ type instanceof DecimalType ||
+ type instanceof InetAddressType ||
+ type instanceof LexicalUUIDType ||
+ type instanceof UUIDType )
return DataType.CHARARRAY;
else if (type instanceof FloatType)
return DataType.FLOAT;
@@ -525,15 +499,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
}
- // compose the CfInfo for the columfamily
- CfInfo cfInfo = getCfInfo(client);
+ // compose the CfDef for the columfamily
+ CfDef cfDef = getCfDef(client);
- if (cfInfo.cfDef != null)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
- properties.setProperty(signature, sb.toString());
- }
+ if (cfDef != null)
+ properties.setProperty(signature, cfdefToString(cfDef));
else
throw new IOException(String.format("Column family '%s' not found in keyspace '%s'",
column_family,
@@ -579,17 +549,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return cfDef;
}
- /** return the CfInf for the column family */
- protected CfInfo getCfInfo(Cassandra.Client client)
+ /** return the CfDef for the column family */
+ protected CfDef getCfDef(Cassandra.Client client)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
SchemaDisagreementException,
TException,
+ CharacterCodingException,
NotFoundException,
org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- IOException
+ ConfigurationException
{
// get CF meta data
String query = "SELECT type," +
@@ -643,19 +613,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
else
cql3Table = true;
}
- cfDef.column_metadata = getColumnMetadata(client);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfDef;
- if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
- cfInfo.compactCqlTable = true;
-
- if (cql3Table)
- cfInfo.cql3Table = true;
- return cfInfo;
+ cfDef.column_metadata = getColumnMetadata(client, cql3Table);
+ return cfDef;
}
/** get a list of columns */
- protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+ protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
@@ -772,7 +735,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
/** get a list of columns with defined index*/
protected List<ColumnDef> getIndexes() throws IOException
{
- CfDef cfdef = getCfInfo(loadSignature).cfDef;
+ CfDef cfdef = getCfDef(loadSignature);
List<ColumnDef> indexes = new ArrayList<ColumnDef>();
for (ColumnDef cdef : cfdef.column_metadata)
{
@@ -801,17 +764,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
{
- if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ if (validator instanceof DecimalType ||
+ validator instanceof InetAddressType ||
+ validator instanceof LexicalUUIDType ||
+ validator instanceof UUIDType)
+ {
return validator.getString(value);
+ }
else
return validator.compose(value);
}
-
- protected class CfInfo
- {
- boolean compactCqlTable = false;
- boolean cql3Table = false;
- CfDef cfDef;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index d9c55a1..4083236 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -103,8 +103,7 @@ public class CassandraStorage extends AbstractCassandraStorage
/** read wide row*/
public Tuple getNextWide() throws IOException
{
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = null;
Tuple tuple = null;
DefaultDataBag bag = new DefaultDataBag();
@@ -127,7 +126,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
@@ -165,7 +164,7 @@ public class CassandraStorage extends AbstractCassandraStorage
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
@@ -182,14 +181,14 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
}
}
@@ -211,8 +210,7 @@ public class CassandraStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
Map<ByteBuffer, Column> cf = reader.getCurrentValue();
assert key != null && cf != null;
@@ -228,21 +226,11 @@ public class CassandraStorage extends AbstractCassandraStorage
// take care to iterate these in the same order as the schema does
for (ColumnDef cdef : cfDef.column_metadata)
{
- boolean hasColumn = false;
- boolean cql3Table = false;
- try
+ if (cf.containsKey(cdef.name))
{
- hasColumn = cf.containsKey(cdef.name);
+ tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
}
- catch (Exception e)
- {
- cql3Table = true;
- }
- if (hasColumn)
- {
- tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
- }
- else if (!cql3Table)
+ else
{ // otherwise, we need to add an empty tuple to take its place
tuple.append(TupleFactory.getInstance().newTuple());
}
@@ -252,7 +240,7 @@ public class CassandraStorage extends AbstractCassandraStorage
for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
// finally, special top-level indexes if needed
@@ -260,7 +248,7 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (ColumnDef cdef : getIndexes())
{
- Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
+ Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
tuple.append(throwaway.get(1));
}
}
@@ -370,8 +358,7 @@ public class CassandraStorage extends AbstractCassandraStorage
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
if (cfDef.column_type.equals("Super"))
return null;
@@ -418,7 +405,7 @@ public class CassandraStorage extends AbstractCassandraStorage
// add the key first, then the indexed columns, and finally the bag
allSchemaFields.add(keyFieldSchema);
- if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+ if (!widerows)
{
// defined validators/indexes
for (ColumnDef cdef : cfDef.column_metadata)
@@ -712,17 +699,12 @@ public class CassandraStorage extends AbstractCassandraStorage
}
/** get a list of column for the column family */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
{
+ if (cql3Table)
+ return new ArrayList<>();
+
return getColumnMeta(client, true, true);
}
@@ -807,5 +789,6 @@ public class CassandraStorage extends AbstractCassandraStorage
"[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
}
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 9a9b522..50ee6b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -94,8 +94,7 @@ public class CqlStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
Map<String, ByteBuffer> keys = reader.getCurrentKey();
Map<String, ByteBuffer> columns = reader.getCurrentValue();
assert keys != null && columns != null;
@@ -281,8 +280,7 @@ public class CqlStorage extends AbstractCassandraStorage
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
@@ -432,7 +430,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
/** include key columns */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
[2/5] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3ad2e82
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3ad2e82
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3ad2e82
Branch: refs/heads/trunk
Commit: a3ad2e82249b88d4a05f24140948cdbc809d14f3
Parents: 8a506e6 639c01a
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 15:30:27 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 15:30:27 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 97 ++++++++++++++------
.../cassandra/hadoop/pig/CassandraStorage.java | 55 +++++++----
.../apache/cassandra/hadoop/pig/CqlStorage.java | 8 +-
3 files changed, 109 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index c881734,dbebfb5..486c781
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@@ -127,14 -128,36 +128,37 @@@ public abstract class AbstractCassandra
setTupleValue(pair, 0, cassandraToObj(comparator, col.name()));
// value
- if (col instanceof Column)
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ if (validators.get(col.name()) == null)
{
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
+ // standard
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+ ByteBuffer colName;
+ if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
+ {
+ ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
+ colName = names[names.length-1];
+ }
+ else
+ colName = col.name();
+ if (validators.get(colName) == null)
+ {
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
+ }
+ else
+ setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
+ return pair;
}
else
- setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
+ {
+ // super
+ ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+ for (IColumn subcol : col.getSubColumns())
+ subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
+
+ pair.set(1, new DefaultDataBag(subcols));
+ }
return pair;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 4083236,a7cc1ad..d9c55a1
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -124,9 -123,9 +125,9 @@@ public class CassandraStorage extends A
key = (ByteBuffer)reader.getCurrentKey();
tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
}
- for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
@@@ -162,9 -161,9 +163,9 @@@
tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
- for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
@@@ -176,19 -175,19 +177,19 @@@
else
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
}
- SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ SortedMap<ByteBuffer, Column> row = (SortedMap<ByteBuffer, Column>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
{
- for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
- for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
}
}
@@@ -210,9 -209,10 +211,10 @@@
if (!reader.nextKeyValue())
return null;
- CfDef cfDef = getCfDef(loadSignature);
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
ByteBuffer key = reader.getCurrentKey();
- Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
+ Map<ByteBuffer, Column> cf = reader.getCurrentValue();
assert key != null && cf != null;
// output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
@@@ -237,10 -247,10 +249,10 @@@
added.put(cdef.name, true);
}
// now add all the other columns
- for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
- bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
// finally, special top-level indexes if needed
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3ad2e82/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
[3/5] git commit: Revert "Merge branch 'cassandra-1.2' into
cassandra-2.0"
Posted by br...@apache.org.
Revert "Merge branch 'cassandra-1.2' into cassandra-2.0"
This reverts commit a3ad2e82249b88d4a05f24140948cdbc809d14f3, reversing
changes made to 8a506e66a66c004a7a253e3dd28845517da8a967.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5dba3c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5dba3c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5dba3c6
Branch: refs/heads/trunk
Commit: e5dba3c6250eb7285777490a937b66a7092afa77
Parents: a3ad2e8
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 11 17:41:22 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 11 17:41:22 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 97 ++++++--------------
.../cassandra/hadoop/pig/CassandraStorage.java | 55 ++++-------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 8 +-
3 files changed, 51 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 486c781..c881734 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -97,7 +97,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected String outputFormatClass;
protected int splitSize = 64 * 1024;
protected String partitionerClass;
- protected boolean usePartitionFilter = false;
+ protected boolean usePartitionFilter = false;
public AbstractCassandraStorage()
{
@@ -116,9 +116,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
/** convert a column to a tuple */
- protected Tuple columnToTuple(IColumn col, CfInfo cfInfo, AbstractType comparator) throws IOException
+ protected Tuple columnToTuple(Column col, CfDef cfDef, AbstractType comparator) throws IOException
{
- CfDef cfDef = cfInfo.cfDef;
Tuple pair = TupleFactory.getInstance().newTuple(2);
// name
@@ -131,34 +130,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
if (validators.get(col.name()) == null)
{
- // standard
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
- ByteBuffer colName;
- if (cfInfo.cql3Table && !cfInfo.compactCqlTable)
- {
- ByteBuffer[] names = ((AbstractCompositeType) parseType(cfDef.comparator_type)).split(col.name());
- colName = names[names.length-1];
- }
- else
- colName = col.name();
- if (validators.get(colName) == null)
- {
- Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
- setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
- }
- else
- setTupleValue(pair, 1, cassandraToObj(validators.get(colName), col.value()));
- return pair;
+ Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value()));
}
else
- {
- // super
- ArrayList<Tuple> subcols = new ArrayList<Tuple>();
- for (IColumn subcol : col.getSubColumns())
- subcols.add(columnToTuple(subcol, cfInfo, parseType(cfDef.getSubcomparator_type())));
-
- pair.set(1, new DefaultDataBag(subcols));
- }
+ setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value()));
return pair;
}
@@ -178,16 +154,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
/** get the columnfamily definition for the signature */
- protected CfInfo getCfInfo(String signature) throws IOException
+ protected CfDef getCfDef(String signature) throws IOException
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- String prop = property.getProperty(signature);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfdefFromString(prop.substring(2));
- cfInfo.compactCqlTable = prop.charAt(0) == '1' ? true : false;
- cfInfo.cql3Table = prop.charAt(1) == '1' ? true : false;
- return cfInfo;
+ return cfdefFromString(property.getProperty(signature));
}
/** construct a map to store the mashaller type to cassandra data type mapping */
@@ -344,7 +315,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return DataType.INTEGER;
else if (type instanceof AsciiType ||
type instanceof UTF8Type ||
- type instanceof DecimalType || type instanceof InetAddressType )
+ type instanceof DecimalType ||
+ type instanceof InetAddressType ||
+ type instanceof LexicalUUIDType ||
+ type instanceof UUIDType )
return DataType.CHARARRAY;
else if (type instanceof FloatType)
return DataType.FLOAT;
@@ -525,15 +499,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
}
- // compose the CfInfo for the columfamily
- CfInfo cfInfo = getCfInfo(client);
+ // compose the CfDef for the columfamily
+ CfDef cfDef = getCfDef(client);
- if (cfInfo.cfDef != null)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(cfInfo.compactCqlTable ? 1 : 0).append(cfInfo.cql3Table ? 1: 0).append(cfdefToString(cfInfo.cfDef));
- properties.setProperty(signature, sb.toString());
- }
+ if (cfDef != null)
+ properties.setProperty(signature, cfdefToString(cfDef));
else
throw new IOException(String.format("Column family '%s' not found in keyspace '%s'",
column_family,
@@ -579,17 +549,17 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return cfDef;
}
- /** return the CfInf for the column family */
- protected CfInfo getCfInfo(Cassandra.Client client)
+ /** return the CfDef for the column family */
+ protected CfDef getCfDef(Cassandra.Client client)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
SchemaDisagreementException,
TException,
+ CharacterCodingException,
NotFoundException,
org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- IOException
+ ConfigurationException
{
// get CF meta data
String query = "SELECT type," +
@@ -643,19 +613,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
else
cql3Table = true;
}
- cfDef.column_metadata = getColumnMetadata(client);
- CfInfo cfInfo = new CfInfo();
- cfInfo.cfDef = cfDef;
- if (cql3Table && !(parseType(cfDef.comparator_type) instanceof AbstractCompositeType))
- cfInfo.compactCqlTable = true;
-
- if (cql3Table)
- cfInfo.cql3Table = true;
- return cfInfo;
+ cfDef.column_metadata = getColumnMetadata(client, cql3Table);
+ return cfDef;
}
/** get a list of columns */
- protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+ protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
@@ -772,7 +735,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
/** get a list of columns with defined index*/
protected List<ColumnDef> getIndexes() throws IOException
{
- CfDef cfdef = getCfInfo(loadSignature).cfDef;
+ CfDef cfdef = getCfDef(loadSignature);
List<ColumnDef> indexes = new ArrayList<ColumnDef>();
for (ColumnDef cdef : cfdef.column_metadata)
{
@@ -801,17 +764,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected Object cassandraToObj(AbstractType validator, ByteBuffer value)
{
- if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ if (validator instanceof DecimalType ||
+ validator instanceof InetAddressType ||
+ validator instanceof LexicalUUIDType ||
+ validator instanceof UUIDType)
+ {
return validator.getString(value);
+ }
else
return validator.compose(value);
}
-
- protected class CfInfo
- {
- boolean compactCqlTable = false;
- boolean cql3Table = false;
- CfDef cfDef;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index d9c55a1..4083236 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -103,8 +103,7 @@ public class CassandraStorage extends AbstractCassandraStorage
/** read wide row*/
public Tuple getNextWide() throws IOException
{
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = null;
Tuple tuple = null;
DefaultDataBag bag = new DefaultDataBag();
@@ -127,7 +126,7 @@ public class CassandraStorage extends AbstractCassandraStorage
}
for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
@@ -165,7 +164,7 @@ public class CassandraStorage extends AbstractCassandraStorage
addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
@@ -182,14 +181,14 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (Map.Entry<ByteBuffer, Column> entry : lastRow.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
lastKey = null;
lastRow = null;
}
for (Map.Entry<ByteBuffer, Column> entry : row.entrySet())
{
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
}
}
@@ -211,8 +210,7 @@ public class CassandraStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = reader.getCurrentKey();
Map<ByteBuffer, Column> cf = reader.getCurrentValue();
assert key != null && cf != null;
@@ -228,21 +226,11 @@ public class CassandraStorage extends AbstractCassandraStorage
// take care to iterate these in the same order as the schema does
for (ColumnDef cdef : cfDef.column_metadata)
{
- boolean hasColumn = false;
- boolean cql3Table = false;
- try
+ if (cf.containsKey(cdef.name))
{
- hasColumn = cf.containsKey(cdef.name);
+ tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
}
- catch (Exception e)
- {
- cql3Table = true;
- }
- if (hasColumn)
- {
- tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
- }
- else if (!cql3Table)
+ else
{ // otherwise, we need to add an empty tuple to take its place
tuple.append(TupleFactory.getInstance().newTuple());
}
@@ -252,7 +240,7 @@ public class CassandraStorage extends AbstractCassandraStorage
for (Map.Entry<ByteBuffer, Column> entry : cf.entrySet())
{
if (!added.containsKey(entry.getKey()))
- bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+ bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
// finally, special top-level indexes if needed
@@ -260,7 +248,7 @@ public class CassandraStorage extends AbstractCassandraStorage
{
for (ColumnDef cdef : getIndexes())
{
- Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
+ Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
tuple.append(throwaway.get(1));
}
}
@@ -370,8 +358,7 @@ public class CassandraStorage extends AbstractCassandraStorage
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
if (cfDef.column_type.equals("Super"))
return null;
@@ -418,7 +405,7 @@ public class CassandraStorage extends AbstractCassandraStorage
// add the key first, then the indexed columns, and finally the bag
allSchemaFields.add(keyFieldSchema);
- if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+ if (!widerows)
{
// defined validators/indexes
for (ColumnDef cdef : cfDef.column_metadata)
@@ -712,17 +699,12 @@ public class CassandraStorage extends AbstractCassandraStorage
}
/** get a list of column for the column family */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+ throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
{
+ if (cql3Table)
+ return new ArrayList<>();
+
return getColumnMeta(client, true, true);
}
@@ -807,5 +789,6 @@ public class CassandraStorage extends AbstractCassandraStorage
"[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
}
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5dba3c6/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 9a9b522..50ee6b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -94,8 +94,7 @@ public class CqlStorage extends AbstractCassandraStorage
if (!reader.nextKeyValue())
return null;
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
Map<String, ByteBuffer> keys = reader.getCurrentKey();
Map<String, ByteBuffer> columns = reader.getCurrentValue();
assert keys != null && columns != null;
@@ -281,8 +280,7 @@ public class CqlStorage extends AbstractCassandraStorage
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
- CfInfo cfInfo = getCfInfo(loadSignature);
- CfDef cfDef = cfInfo.cfDef;
+ CfDef cfDef = getCfDef(loadSignature);
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
@@ -432,7 +430,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
/** include key columns */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+ protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
throws InvalidRequestException,
UnavailableException,
TimedOutException,