You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Pete Warden (Issue Comment Edited) (JIRA)" <ji...@apache.org> on 2011/10/20 08:27:12 UTC
[jira] [Issue Comment Edited] (CASSANDRA-3371) Cassandra inferred
schema and actual data don't match
[ https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13130990#comment-13130990 ]
Pete Warden edited comment on CASSANDRA-3371 at 10/20/11 6:26 AM:
------------------------------------------------------------------
Is there a reason the columns can't at least go into a map? As things stand, it's painfully hard to do the natural row.column.value lookup in a script. Or to put it as a concrete example, I can currently do this:
all_votes = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
album_votes = FILTER all_votes BY ((vote_type EQ 'album_like') OR (vote_type EQ 'album_dislike'));
What does this example look like with your approach?
was (Author: petewarden):
Is there a reason the columns can't at least go into a map? As things stand, it's painfully hard to do the natural row.column.value lookup in a script. Or to put it as a concrete example, I can currently do this:
all_votes = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
album_votes = FILTER votes BY ((vote_type EQ 'album_like') OR (vote_type EQ 'album_dislike'));
What does this example look like with your approach?
> Cassandra inferred schema and actual data don't match
> -----------------------------------------------------
>
> Key: CASSANDRA-3371
> URL: https://issues.apache.org/jira/browse/CASSANDRA-3371
> Project: Cassandra
> Issue Type: Bug
> Components: Hadoop
> Affects Versions: 0.8.7
> Reporter: Pete Warden
> Assignee: Brandon Williams
> Attachments: 3371-v2.txt, pig.diff
>
>
> It's looking like there may be a mismatch between the schema that's being reported by the latest CassandraStorage.java, and the data that's actually returned. Here's an example:
> rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
> DESCRIBE rows;
> rows: {key: chararray,columns: {(name: chararray,value: bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: chararray,value_pid: bytearray,matched_string: chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: bytearray,time: chararray,value_time: bytearray,vote_type: chararray,value_vote_type: bytearray,voter: chararray,value_voter: bytearray)}}
> DUMP rows;
> (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)})
> getSchema() is reporting the columns as an inner bag of tuples, each of which contains 16 values. In fact, getNext() seems to return an inner bag containing 7 tuples, each of which contains two values.
> It appears that things got out of sync with this change:
> http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083
> See more discussion at:
> http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html
> Here's a patch I ended up creating for my own use, which gives the results I need (though it doesn't handle super-columns):
> DESCRIBE rows;
> rows: {cassandra_key: chararray,photo_owner: bytearray,pid: bytearray,place_matched_string: bytearray,src_big: bytearray,time: bytearray,vote_type: bytearray,voter: bytearray}
> Index: contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
> ===================================================================
> --- contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (revision 1185044)
> +++ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (working copy)
> @@ -26,7 +26,7 @@
> import org.apache.cassandra.db.marshal.IntegerType;
> import org.apache.cassandra.db.marshal.TypeParser;
> import org.apache.cassandra.thrift.*;
> -import org.apache.cassandra.utils.Hex;
> +import org.apache.cassandra.utils.FBUtilities;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
>
> @@ -122,15 +122,15 @@
> assert key != null && cf != null;
>
> // and wrap it in a tuple
> - Tuple tuple = TupleFactory.getInstance().newTuple(2);
> + Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1);
> ArrayList<Tuple> columns = new ArrayList<Tuple>();
> - tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
> + int tupleIndex = 0;
> + tuple.set(tupleIndex++, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
> for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
> {
> - columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
> + tuple.set(tupleIndex++, columnToTuple(entry.getKey(), entry.getValue(), cfDef));
> }
>
> - tuple.set(1, new DefaultDataBag(columns));
> return tuple;
> }
> catch (InterruptedException e)
> @@ -139,30 +139,22 @@
> }
> }
>
> - private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
> + private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
> {
> - Tuple pair = TupleFactory.getInstance().newTuple(2);
> List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
> Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
>
> - setTupleValue(pair, 0, marshallers.get(0).compose(name));
> if (col instanceof Column)
> {
> // standard
> if (validators.get(name) == null)
> - setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
> + return marshallers.get(1).compose(col.value());
> else
> - setTupleValue(pair, 1, validators.get(name).compose(col.value()));
> - return pair;
> + return validators.get(name).compose(col.value());
> }
>
> - // super
> - ArrayList<Tuple> subcols = new ArrayList<Tuple>();
> - for (IColumn subcol : col.getSubColumns())
> - subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
> -
> - pair.set(1, new DefaultDataBag(subcols));
> - return pair;
> + // super not currently handled
> + return null;
> }
>
> private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
> @@ -312,62 +304,32 @@
> // top-level schema, no type
> ResourceSchema schema = new ResourceSchema();
>
> + ResourceFieldSchema[] tupleFields = new ResourceFieldSchema[cfDef.column_metadata.size()+1];
> + int tupleIndex = 0;
> +
> // add key
> ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
> - keyFieldSchema.setName("key");
> + keyFieldSchema.setName("cassandra_key");
> keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
> + tupleFields[tupleIndex++] = keyFieldSchema;
>
> - // will become the bag of tuples
> - ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
> - bagFieldSchema.setName("columns");
> - bagFieldSchema.setType(DataType.BAG);
> - ResourceSchema bagSchema = new ResourceSchema();
> -
> -
> List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
> Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
> - List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>();
>
> - // default comparator/validator
> - ResourceSchema innerTupleSchema = new ResourceSchema();
> - ResourceFieldSchema tupleField = new ResourceFieldSchema();
> - tupleField.setType(DataType.TUPLE);
> - tupleField.setSchema(innerTupleSchema);
> -
> - ResourceFieldSchema colSchema = new ResourceFieldSchema();
> - colSchema.setName("name");
> - colSchema.setType(getPigType(marshallers.get(0)));
> - tupleFields.add(colSchema);
> -
> - ResourceFieldSchema valSchema = new ResourceFieldSchema();
> - AbstractType validator = marshallers.get(1);
> - valSchema.setName("value");
> - valSchema.setType(getPigType(validator));
> - tupleFields.add(valSchema);
> -
> // defined validators/indexes
> for (ColumnDef cdef : cfDef.column_metadata)
> {
> - colSchema = new ResourceFieldSchema();
> - colSchema.setName(new String(cdef.getName()));
> - colSchema.setType(getPigType(marshallers.get(0)));
> - tupleFields.add(colSchema);
> -
> - valSchema = new ResourceFieldSchema();
> - validator = validators.get(cdef.getName());
> + ResourceFieldSchema valSchema = new ResourceFieldSchema();
> + AbstractType validator = validators.get(cdef.getName());
> if (validator == null)
> validator = marshallers.get(1);
> - valSchema.setName("value");
> + valSchema.setName(new String(cdef.getName()));
> valSchema.setType(getPigType(validator));
> - tupleFields.add(valSchema);
> + tupleFields[tupleIndex++] = valSchema;
> }
> - innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()]));
>
> - // a bag can contain only one tuple, but that tuple can contain anything
> - bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
> - bagFieldSchema.setSchema(bagSchema);
> // top level schema contains everything
> - schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema });
> + schema.setFields(tupleFields);
> return schema;
> }
>
> @@ -601,7 +563,7 @@
> TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
> try
> {
> - return Hex.bytesToHex(serializer.serialize(cfDef));
> + return FBUtilities.bytesToHex(serializer.serialize(cfDef));
> }
> catch (TException e)
> {
> @@ -616,7 +578,7 @@
> CfDef cfDef = new CfDef();
> try
> {
> - deserializer.deserialize(cfDef, Hex.hexToBytes(st));
> + deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
> }
> catch (TException e)
> {
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira