You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Pete Warden (Issue Comment Edited) (JIRA)" <ji...@apache.org> on 2011/10/20 08:27:12 UTC

[jira] [Issue Comment Edited] (CASSANDRA-3371) Cassandra inferred schema and actual data don't match

    [ https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13130990#comment-13130990 ] 

Pete Warden edited comment on CASSANDRA-3371 at 10/20/11 6:26 AM:
------------------------------------------------------------------

Is there a reason the columns can't at least go into a map? As things stand, it's painfully hard to do the natural row.column.value lookup in a script. Or to put it as a concrete example, I can currently do this:

all_votes = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
album_votes = FILTER all_votes BY ((vote_type EQ 'album_like') OR (vote_type EQ 'album_dislike'));

What does this example look like with your approach?
                
      was (Author: petewarden):
    Is there a reason the columns can't at least go into a map? As things stand, it's painfully hard to do the natural row.column.value lookup in a script. Or to put it as a concrete example, I can currently do this:

all_votes = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
album_votes = FILTER votes BY ((vote_type EQ 'album_like') OR (vote_type EQ 'album_dislike'));

What does this example look like with your approach?
                  
> Cassandra inferred schema and actual data don't match
> -----------------------------------------------------
>
>                 Key: CASSANDRA-3371
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-3371
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Hadoop
>    Affects Versions: 0.8.7
>            Reporter: Pete Warden
>            Assignee: Brandon Williams
>         Attachments: 3371-v2.txt, pig.diff
>
>
> It's looking like there may be a mismatch between the schema that's being reported by the latest CassandraStorage.java, and the data that's actually returned. Here's an example:
> rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
> DESCRIBE rows;
> rows: {key: chararray,columns: {(name: chararray,value: bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: chararray,value_pid: bytearray,matched_string: chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: bytearray,time: chararray,value_time: bytearray,vote_type: chararray,value_vote_type: bytearray,voter: chararray,value_voter: bytearray)}}
> DUMP rows;
> (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)})
> getSchema() is reporting the columns as an inner bag of tuples, each of which contains 16 values. In fact, getNext() seems to return an inner bag containing 7 tuples, each of which contains two values. 
> It appears that things got out of sync with this change:
> http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083
> See more discussion at:
> http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html
> Here's a patch I ended up creating for my own use, which gives the results I need (though it doesn't handle super-columns):
> DESCRIBE rows;
> rows: {cassandra_key: chararray,photo_owner: bytearray,pid: bytearray,place_matched_string: bytearray,src_big: bytearray,time: bytearray,vote_type: bytearray,voter: bytearray}
> Index: contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
> ===================================================================
> --- contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java	(revision 1185044)
> +++ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java	(working copy)
> @@ -26,7 +26,7 @@
>  import org.apache.cassandra.db.marshal.IntegerType;
>  import org.apache.cassandra.db.marshal.TypeParser;
>  import org.apache.cassandra.thrift.*;
> -import org.apache.cassandra.utils.Hex;
> +import org.apache.cassandra.utils.FBUtilities;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  
> @@ -122,15 +122,15 @@
>              assert key != null && cf != null;
>              
>              // and wrap it in a tuple
> -	        Tuple tuple = TupleFactory.getInstance().newTuple(2);
> +	        Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1);
>              ArrayList<Tuple> columns = new ArrayList<Tuple>();
> -            tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
> +            int tupleIndex = 0;
> +            tuple.set(tupleIndex++, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));            
>              for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
>              {
> -                columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
> +                tuple.set(tupleIndex++, columnToTuple(entry.getKey(), entry.getValue(), cfDef));
>              }
>  
> -            tuple.set(1, new DefaultDataBag(columns));
>              return tuple;
>          }
>          catch (InterruptedException e)
> @@ -139,30 +139,22 @@
>          }
>      }
>  
> -    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
> +    private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
>      {
> -        Tuple pair = TupleFactory.getInstance().newTuple(2);
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
>  
> -        setTupleValue(pair, 0, marshallers.get(0).compose(name));
>          if (col instanceof Column)
>          {
>              // standard
>              if (validators.get(name) == null)
> -                setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
> +                return marshallers.get(1).compose(col.value());
>              else
> -                setTupleValue(pair, 1, validators.get(name).compose(col.value()));
> -            return pair;
> +                return validators.get(name).compose(col.value());
>          }
>  
> -        // super
> -        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
> -        for (IColumn subcol : col.getSubColumns())
> -            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
> -        
> -        pair.set(1, new DefaultDataBag(subcols));
> -        return pair;
> +        // super not currently handled
> +        return null;
>      }
>  
>      private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
> @@ -312,62 +304,32 @@
>          // top-level schema, no type
>          ResourceSchema schema = new ResourceSchema();
>  
> +        ResourceFieldSchema[] tupleFields = new ResourceFieldSchema[cfDef.column_metadata.size()+1];
> +        int tupleIndex = 0;
> +        
>          // add key
>          ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
> -        keyFieldSchema.setName("key");
> +        keyFieldSchema.setName("cassandra_key");
>          keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
> +        tupleFields[tupleIndex++] = keyFieldSchema;
>  
> -        // will become the bag of tuples
> -        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
> -        bagFieldSchema.setName("columns");
> -        bagFieldSchema.setType(DataType.BAG);
> -        ResourceSchema bagSchema = new ResourceSchema();
> -
> -
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
> -        List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>();
>  
> -        // default comparator/validator
> -        ResourceSchema innerTupleSchema = new ResourceSchema();
> -        ResourceFieldSchema tupleField = new ResourceFieldSchema();
> -        tupleField.setType(DataType.TUPLE);
> -        tupleField.setSchema(innerTupleSchema);
> -
> -        ResourceFieldSchema colSchema = new ResourceFieldSchema();
> -        colSchema.setName("name");
> -        colSchema.setType(getPigType(marshallers.get(0)));
> -        tupleFields.add(colSchema);
> -
> -        ResourceFieldSchema valSchema = new ResourceFieldSchema();
> -        AbstractType validator = marshallers.get(1);
> -        valSchema.setName("value");
> -        valSchema.setType(getPigType(validator));
> -        tupleFields.add(valSchema);
> -
>          // defined validators/indexes
>          for (ColumnDef cdef : cfDef.column_metadata)
>          {
> -            colSchema = new ResourceFieldSchema();
> -            colSchema.setName(new String(cdef.getName()));
> -            colSchema.setType(getPigType(marshallers.get(0)));
> -            tupleFields.add(colSchema);
> -
> -            valSchema = new ResourceFieldSchema();
> -            validator = validators.get(cdef.getName());
> +            ResourceFieldSchema valSchema = new ResourceFieldSchema();
> +            AbstractType validator = validators.get(cdef.getName());
>              if (validator == null)
>                  validator = marshallers.get(1);
> -            valSchema.setName("value");
> +            valSchema.setName(new String(cdef.getName()));
>              valSchema.setType(getPigType(validator));
> -            tupleFields.add(valSchema);
> +            tupleFields[tupleIndex++] = valSchema;
>          }
> -        innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()]));
>  
> -        // a bag can contain only one tuple, but that tuple can contain anything
> -        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
> -        bagFieldSchema.setSchema(bagSchema);
>          // top level schema contains everything
> -        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema });
> +        schema.setFields(tupleFields);
>          return schema;
>      }
>  
> @@ -601,7 +563,7 @@
>          TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
>          try
>          {
> -            return Hex.bytesToHex(serializer.serialize(cfDef));
> +            return FBUtilities.bytesToHex(serializer.serialize(cfDef));
>          }
>          catch (TException e)
>          {
> @@ -616,7 +578,7 @@
>          CfDef cfDef = new CfDef();
>          try
>          {
> -            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
> +            deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
>          }
>          catch (TException e)
>          {

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira