You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Jeremy Hanna (Commented) (JIRA)" <ji...@apache.org> on 2011/11/09 21:59:53 UTC

[jira] [Commented] (CASSANDRA-3371) Cassandra inferred schema and actual data don't match

    [ https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13147282#comment-13147282 ] 

Jeremy Hanna commented on CASSANDRA-3371:
-----------------------------------------

Brandon, Jacob Perkins, and Jeremy (me) had a long discussion about how to address this as well as other issues with CassandraStorage.  We came down to a list of 3-4 things that if implemented would resolve the problem with pig 0.9 as well as make CassandraStorage much more usable.

1. Fix schema so that this ticket's problem is resolved - this goes along with #2.

2. have the default return value from CassandraStorage be (key, column, value) as is thought of for transposing wide rows.  If in the constructor, something like pygmalion's FromCassandraBag is specified, then return that.  See pygmalion's doc for that.

3. Inspect what is passed in for the output and if it conforms to pygmalion's ToCassandraBag - namely a key and a bunch of columns, it will introspect the pig schema and use those pig variable names for the column names.

4. Optionally handle the uniqueness case with some kind of context/random identifier so that multiple CassandraStorage instances writing out don't get confused with the schema.
                
> Cassandra inferred schema and actual data don't match
> -----------------------------------------------------
>
>                 Key: CASSANDRA-3371
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-3371
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Hadoop
>    Affects Versions: 0.8.7
>            Reporter: Pete Warden
>            Assignee: Brandon Williams
>         Attachments: 3371-v2.txt, 3371-v3.txt, pig.diff
>
>
> It's looking like there may be a mismatch between the schema that's being reported by the latest CassandraStorage.java, and the data that's actually returned. Here's an example:
> rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
> DESCRIBE rows;
> rows: {key: chararray,columns: {(name: chararray,value: bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: chararray,value_pid: bytearray,matched_string: chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: bytearray,time: chararray,value_time: bytearray,vote_type: chararray,value_vote_type: bytearray,voter: chararray,value_voter: bytearray)}}
> DUMP rows;
> (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)})
> getSchema() is reporting the columns as an inner bag of tuples, each of which contains 16 values. In fact, getNext() seems to return an inner bag containing 7 tuples, each of which contains two values. 
> It appears that things got out of sync with this change:
> http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083
> See more discussion at:
> http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html
> Here's a patch I ended up creating for my own use, which gives the results I need (though it doesn't handle super-columns):
> DESCRIBE rows;
> rows: {cassandra_key: chararray,photo_owner: bytearray,pid: bytearray,place_matched_string: bytearray,src_big: bytearray,time: bytearray,vote_type: bytearray,voter: bytearray}
> Index: contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
> ===================================================================
> --- contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java	(revision 1185044)
> +++ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java	(working copy)
> @@ -26,7 +26,7 @@
>  import org.apache.cassandra.db.marshal.IntegerType;
>  import org.apache.cassandra.db.marshal.TypeParser;
>  import org.apache.cassandra.thrift.*;
> -import org.apache.cassandra.utils.Hex;
> +import org.apache.cassandra.utils.FBUtilities;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  
> @@ -122,15 +122,15 @@
>              assert key != null && cf != null;
>              
>              // and wrap it in a tuple
> -	        Tuple tuple = TupleFactory.getInstance().newTuple(2);
> +	        Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1);
>              ArrayList<Tuple> columns = new ArrayList<Tuple>();
> -            tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
> +            int tupleIndex = 0;
> +            tuple.set(tupleIndex++, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));            
>              for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
>              {
> -                columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef));
> +                tuple.set(tupleIndex++, columnToTuple(entry.getKey(), entry.getValue(), cfDef));
>              }
>  
> -            tuple.set(1, new DefaultDataBag(columns));
>              return tuple;
>          }
>          catch (InterruptedException e)
> @@ -139,30 +139,22 @@
>          }
>      }
>  
> -    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
> +    private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException
>      {
> -        Tuple pair = TupleFactory.getInstance().newTuple(2);
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
>  
> -        setTupleValue(pair, 0, marshallers.get(0).compose(name));
>          if (col instanceof Column)
>          {
>              // standard
>              if (validators.get(name) == null)
> -                setTupleValue(pair, 1, marshallers.get(1).compose(col.value()));
> +                return marshallers.get(1).compose(col.value());
>              else
> -                setTupleValue(pair, 1, validators.get(name).compose(col.value()));
> -            return pair;
> +                return validators.get(name).compose(col.value());
>          }
>  
> -        // super
> -        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
> -        for (IColumn subcol : col.getSubColumns())
> -            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
> -        
> -        pair.set(1, new DefaultDataBag(subcols));
> -        return pair;
> +        // super not currently handled
> +        return null;
>      }
>  
>      private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
> @@ -312,62 +304,32 @@
>          // top-level schema, no type
>          ResourceSchema schema = new ResourceSchema();
>  
> +        ResourceFieldSchema[] tupleFields = new ResourceFieldSchema[cfDef.column_metadata.size()+1];
> +        int tupleIndex = 0;
> +        
>          // add key
>          ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
> -        keyFieldSchema.setName("key");
> +        keyFieldSchema.setName("cassandra_key");
>          keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
> +        tupleFields[tupleIndex++] = keyFieldSchema;
>  
> -        // will become the bag of tuples
> -        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
> -        bagFieldSchema.setName("columns");
> -        bagFieldSchema.setType(DataType.BAG);
> -        ResourceSchema bagSchema = new ResourceSchema();
> -
> -
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
> -        List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>();
>  
> -        // default comparator/validator
> -        ResourceSchema innerTupleSchema = new ResourceSchema();
> -        ResourceFieldSchema tupleField = new ResourceFieldSchema();
> -        tupleField.setType(DataType.TUPLE);
> -        tupleField.setSchema(innerTupleSchema);
> -
> -        ResourceFieldSchema colSchema = new ResourceFieldSchema();
> -        colSchema.setName("name");
> -        colSchema.setType(getPigType(marshallers.get(0)));
> -        tupleFields.add(colSchema);
> -
> -        ResourceFieldSchema valSchema = new ResourceFieldSchema();
> -        AbstractType validator = marshallers.get(1);
> -        valSchema.setName("value");
> -        valSchema.setType(getPigType(validator));
> -        tupleFields.add(valSchema);
> -
>          // defined validators/indexes
>          for (ColumnDef cdef : cfDef.column_metadata)
>          {
> -            colSchema = new ResourceFieldSchema();
> -            colSchema.setName(new String(cdef.getName()));
> -            colSchema.setType(getPigType(marshallers.get(0)));
> -            tupleFields.add(colSchema);
> -
> -            valSchema = new ResourceFieldSchema();
> -            validator = validators.get(cdef.getName());
> +            ResourceFieldSchema valSchema = new ResourceFieldSchema();
> +            AbstractType validator = validators.get(cdef.getName());
>              if (validator == null)
>                  validator = marshallers.get(1);
> -            valSchema.setName("value");
> +            valSchema.setName(new String(cdef.getName()));
>              valSchema.setType(getPigType(validator));
> -            tupleFields.add(valSchema);
> +            tupleFields[tupleIndex++] = valSchema;
>          }
> -        innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()]));
>  
> -        // a bag can contain only one tuple, but that tuple can contain anything
> -        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
> -        bagFieldSchema.setSchema(bagSchema);
>          // top level schema contains everything
> -        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema });
> +        schema.setFields(tupleFields);
>          return schema;
>      }
>  
> @@ -601,7 +563,7 @@
>          TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
>          try
>          {
> -            return Hex.bytesToHex(serializer.serialize(cfDef));
> +            return FBUtilities.bytesToHex(serializer.serialize(cfDef));
>          }
>          catch (TException e)
>          {
> @@ -616,7 +578,7 @@
>          CfDef cfDef = new CfDef();
>          try
>          {
> -            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
> +            deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
>          }
>          catch (TException e)
>          {

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira