You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/14 00:44:29 UTC

[3/3] git commit: Pig: inferred and actual schema match, smoke tests. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3371

Pig: inferred and actual schema match, smoke tests.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3371


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e48b29a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e48b29a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e48b29a6

Branch: refs/heads/trunk
Commit: e48b29a6b0b15f9e1c8d8e462a573daa04d1ec5e
Parents: 9ca8478
Author: Brandon Williams <br...@apache.org>
Authored: Mon Feb 13 17:28:47 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Feb 13 17:28:47 2012 -0600

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |  304 +++++++++------
 contrib/pig/test/populate-cli.txt                  |   67 ++++
 contrib/pig/test/test_storage.pig                  |   22 +
 3 files changed, 283 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48b29a6/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 975d5ba..7e55ee0 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.logging.Log;
@@ -33,7 +34,6 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.SuperColumn;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.Mutation;
@@ -50,15 +50,10 @@ import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Cassandra
@@ -85,8 +80,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private String storeSignature;
 
     private Configuration conf;
-    private RecordReader reader;
-    private RecordWriter writer;
+    private RecordReader<ByteBuffer, Map<ByteBuffer, IColumn>> reader;
+    private RecordWriter<ByteBuffer, List<Mutation>> writer;
     private int limit;
 
     public CassandraStorage()
@@ -118,20 +113,37 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                 return null;
 
             CfDef cfDef = getCfDef(loadSignature);
-            ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
-            SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+            ByteBuffer key = reader.getCurrentKey();
+            Map<ByteBuffer, IColumn> cf = reader.getCurrentValue();
             assert key != null && cf != null;
-            
-            // and wrap it in a tuple
-            Tuple tuple = TupleFactory.getInstance().newTuple(2);
-            ArrayList<Tuple> columns = new ArrayList<Tuple>();
-            tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+
+            // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
+            Tuple tuple = TupleFactory.getInstance().newTuple();
+            DefaultDataBag bag = new DefaultDataBag();
+            // set the key
+            tuple.append(new DataByteArray(ByteBufferUtil.getArray(key)));
+            // we must add all the indexed columns first to match the schema
+            Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>();
+            // take care to iterate these in the same order as the schema does
+            for (ColumnDef cdef : cfDef.column_metadata)
+            {
+                if (cf.containsKey(cdef.name))
+                {
+                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
+                }
+                else
+                {   // otherwise, we need to add an empty tuple to take its place
+                    tuple.append(TupleFactory.getInstance().newTuple());
+                }
+                added.put(cdef.name, true);
+            }
+            // now add all the other columns
             for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
             {
-                columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
+                if (!added.containsKey(entry.getKey()))
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
             }
-
-            tuple.set(1, new DefaultDataBag(columns));
+            tuple.append(bag);
             return tuple;
         }
         catch (InterruptedException e)
@@ -334,6 +346,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         }
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
+        if (loadSignature == null)
+            loadSignature = location;
         initSchema(loadSignature);
     }
 
@@ -344,6 +358,13 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
 
         if (cfDef.column_type.equals("Super"))
             return null;
+        /*
+        Our returned schema should look like this:
+        (key, index1:(name, value), index2:(name, value), columns:{(name, value)})
+        Which is to say, columns that have metadata will be returned as named tuples, but unknown columns will go into a bag.
+        This way, wide rows can still be handled by the bag, but known columns can easily be referenced.
+         */
+
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
 
@@ -356,54 +377,59 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         keyFieldSchema.setName("key");
         keyFieldSchema.setType(getPigType(marshallers.get(2)));
 
-        // will become the bag of tuples
-        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
-        bagFieldSchema.setName("columns");
-        bagFieldSchema.setType(DataType.BAG);
         ResourceSchema bagSchema = new ResourceSchema();
-
-        List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>();
-
-        // default comparator/validator
-        ResourceSchema innerTupleSchema = new ResourceSchema();
-        ResourceFieldSchema tupleField = new ResourceFieldSchema();
-        tupleField.setType(DataType.TUPLE);
-        tupleField.setSchema(innerTupleSchema);
-
-        ResourceFieldSchema colSchema = new ResourceFieldSchema();
-        colSchema.setName("name");
-        colSchema.setType(getPigType(marshallers.get(0)));
-        tupleFields.add(colSchema);
-
-        ResourceFieldSchema valSchema = new ResourceFieldSchema();
-        AbstractType validator = marshallers.get(1);
-        valSchema.setName("value");
-        valSchema.setType(getPigType(validator));
-        tupleFields.add(valSchema);
+        ResourceFieldSchema bagField = new ResourceFieldSchema();
+        bagField.setType(DataType.BAG);
+        bagField.setName("columns");
+        // inside the bag, place one tuple with the default comparator/validator schema
+        ResourceSchema bagTupleSchema = new ResourceSchema();
+        ResourceFieldSchema bagTupleField = new ResourceFieldSchema();
+        bagTupleField.setType(DataType.TUPLE);
+        ResourceFieldSchema bagcolSchema = new ResourceFieldSchema();
+        ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
+        bagcolSchema.setName("name");
+        bagvalSchema.setName("value");
+        bagcolSchema.setType(getPigType(marshallers.get(0)));
+        bagvalSchema.setType(getPigType(marshallers.get(1)));
+        bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
+        bagTupleField.setSchema(bagTupleSchema);
+        bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
+        bagField.setSchema(bagSchema);
+
+        // will contain all fields for this schema
+        List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
+        // add the key first, then the indexed columns, and finally the bag
+        allSchemaFields.add(keyFieldSchema);
 
         // defined validators/indexes
         for (ColumnDef cdef : cfDef.column_metadata)
         {
-            colSchema = new ResourceFieldSchema();
-            colSchema.setName(new String(cdef.getName()));
-            colSchema.setType(getPigType(marshallers.get(0)));
-            tupleFields.add(colSchema);
-
-            valSchema = new ResourceFieldSchema();
-            validator = validators.get(ByteBuffer.wrap(cdef.getName()));
+            // make a new tuple for each col/val pair
+            ResourceSchema innerTupleSchema = new ResourceSchema();
+            ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
+            innerTupleField.setType(DataType.TUPLE);
+            innerTupleField.setSchema(innerTupleSchema);
+            innerTupleField.setName(new String(cdef.getName()));
+
+            ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
+            idxColSchema.setName("name");
+            idxColSchema.setType(getPigType(marshallers.get(0)));
+
+            ResourceFieldSchema valSchema = new ResourceFieldSchema();
+            AbstractType validator = validators.get(cdef.name);
             if (validator == null)
                 validator = marshallers.get(1);
             valSchema.setName("value");
             valSchema.setType(getPigType(validator));
-            tupleFields.add(valSchema);
+
+            innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
+            allSchemaFields.add(innerTupleField);
         }
-        innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()]));
+        // bag at the end for unknown columns
+        allSchemaFields.add(bagField);
 
-        // a bag can contain only one tuple, but that tuple can contain anything
-        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
-        bagFieldSchema.setSchema(bagSchema);
         // top level schema contains everything
-        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema });
+        schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
         return schema;
     }
 
@@ -502,79 +528,137 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             return DoubleType.instance.decompose((Double)o);
         if (o instanceof UUID)
             return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+
         return ByteBuffer.wrap(((DataByteArray) o).get());
     }
 
-    public void putNext(Tuple t) throws ExecException, IOException
+    public void putNext(Tuple t) throws IOException
     {
+        /*
+        We support two cases for output:
+        First, the original output:
+            (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
+        For supers, we only accept the original output.
+        */
+
+        if (t.size() < 1)
+        {
+            // simply nothing here, we can't even delete without a key
+            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+            return;
+        }
         ByteBuffer key = objToBB(t.get(0));
-        DefaultDataBag pairs = (DefaultDataBag) t.get(1);
+        if (t.getType(1) == DataType.TUPLE)
+            writeColumnsFromTuple(key, t, 1);
+        else if (t.getType(1) == DataType.BAG)
+        {
+            if (t.size() > 2)
+                throw new IOException("No arguments allowed after bag");
+            writeColumnsFromBag(key, (DefaultDataBag) t.get(1));
+        }
+        else
+            throw new IOException("Second argument in output must be a tuple or bag");
+    }
+
+    private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException
+    {
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
-        CfDef cfDef = getCfDef(storeSignature);
-        try
+        for (int i = offset; i < t.size(); i++)
         {
-            for (Tuple pair : pairs)
+            if (t.getType(i) == DataType.BAG)
+                writeColumnsFromBag(key, (DefaultDataBag) t.get(i));
+            else if (t.getType(i) == DataType.TUPLE)
             {
-               Mutation mutation = new Mutation();
-               if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
-               {
-                   org.apache.cassandra.thrift.SuperColumn sc = new org.apache.cassandra.thrift.SuperColumn();
-                   sc.name = objToBB(pair.get(0));
-                   ArrayList<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>();
-                   for (Tuple subcol : (DefaultDataBag) pair.get(1))
-                   {
-                       org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
-                       column.name = objToBB(subcol.get(0));
-                       column.value = objToBB(subcol.get(1));
-                       column.setTimestamp(System.currentTimeMillis() * 1000);
-                       columns.add(column);
-                   }
-                   if (columns.isEmpty()) // a deletion
-                   {
-                       mutation.deletion = new Deletion();
-                       mutation.deletion.super_column = objToBB(pair.get(0));
-                       mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000);
-                   }
-                   else
-                   {
-                       sc.columns = columns;
-                       mutation.column_or_supercolumn = new ColumnOrSuperColumn();
-                       mutation.column_or_supercolumn.super_column = sc;
-                   }
-               }
-               else // assume column since it couldn't be anything else
-               {
-                   if (pair.get(1) == null)
-                   {
-                       mutation.deletion = new Deletion();
-                       mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
-                       mutation.deletion.predicate.column_names = Arrays.asList(objToBB(pair.get(0)));
-                       mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000);
-                   }
-                   else
-                   {
-                       org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
-                       column.name = objToBB(pair.get(0));
-                       column.value = objToBB(pair.get(1));
-                       column.setTimestamp(System.currentTimeMillis() * 1000);
-                       mutation.column_or_supercolumn = new ColumnOrSuperColumn();
-                       mutation.column_or_supercolumn.column = column;
-                   }
-               }
-               mutationList.add(mutation);
+                Tuple inner = (Tuple) t.get(i);
+                if (inner.size() > 0) // may be empty, for an indexed column that wasn't present
+                    mutationList.add(mutationFromTuple(inner));
             }
+            else
+                throw new IOException("Output type was not a bag or a tuple");
         }
-        catch (ClassCastException e)
+        if (mutationList.size() > 0)
+            writeMutations(key, mutationList);
+    }
+
+    private Mutation mutationFromTuple(Tuple t) throws IOException
+    {
+        Mutation mutation = new Mutation();
+        if (t.get(1) == null)
         {
-            throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
+            // TODO: optional deletion
+            mutation.deletion = new Deletion();
+            mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
+            mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
+            mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
         }
+        else
+        {
+            org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
+            column.setName(objToBB(t.get(0)));
+            column.setValue(objToBB(t.get(1)));
+            column.setTimestamp(FBUtilities.timestampMicros());
+            mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+            mutation.column_or_supercolumn.column = column;
+        }
+        return mutation;
+    }
+
+    private void writeColumnsFromBag(ByteBuffer key, DefaultDataBag bag) throws IOException
+    {
+        List<Mutation> mutationList = new ArrayList<Mutation>();
+        for (Tuple pair : bag)
+        {
+            Mutation mutation = new Mutation();
+            if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
+            {
+                SuperColumn sc = new SuperColumn();
+                sc.setName(objToBB(pair.get(0)));
+                List<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>();
+                for (Tuple subcol : (DefaultDataBag) pair.get(1))
+                {
+                    org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
+                    column.setName(objToBB(subcol.get(0)));
+                    column.setValue(objToBB(subcol.get(1)));
+                    column.setTimestamp(FBUtilities.timestampMicros());
+                    columns.add(column);
+                }
+                if (columns.isEmpty()) // TODO: optional deletion
+                {
+                    mutation.deletion = new Deletion();
+                    mutation.deletion.super_column = objToBB(pair.get(0));
+                    mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+                }
+                else
+                {
+                    sc.columns = columns;
+                    mutation.column_or_supercolumn = new ColumnOrSuperColumn();
+                    mutation.column_or_supercolumn.super_column = sc;
+                }
+            }
+            else
+                mutation = mutationFromTuple(pair);
+            mutationList.add(mutation);
+            // for wide rows, we need to limit the amount of mutations we write at once
+            if (mutationList.size() >= 10) // arbitrary, CFOF will re-batch this up, and BOF won't care
+            {
+                writeMutations(key, mutationList);
+                mutationList.clear();
+            }
+        }
+        // write the last batch
+        if (mutationList.size() > 0)
+            writeMutations(key, mutationList);
+    }
+
+    private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException
+    {
         try
         {
-            writer.write(key, mutationList);
+            writer.write(key, mutations);
         }
         catch (InterruptedException e)
         {
-           throw new IOException(e);
+            throw new IOException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48b29a6/contrib/pig/test/populate-cli.txt
----------------------------------------------------------------------
diff --git a/contrib/pig/test/populate-cli.txt b/contrib/pig/test/populate-cli.txt
new file mode 100644
index 0000000..665fba4
--- /dev/null
+++ b/contrib/pig/test/populate-cli.txt
@@ -0,0 +1,67 @@
+create keyspace PigTest;
+use PigTest;
+create column family SomeApp with
+key_validation_class = UTF8Type and
+default_validation_class = LexicalUUIDType and
+comparator = UTF8Type and
+column_metadata =
+[
+    {column_name: name, validation_class: UTF8Type, index_type: KEYS},
+    {column_name: vote_type, validation_class: UTF8Type},
+    {column_name: rating, validation_class: IntegerType},
+    {column_name: score, validation_class: LongType},
+    {column_name: percent, validation_class: FloatType},
+    {column_name: atomic_weight, validation_class: DoubleType},
+];
+
+create column family CopyOfSomeApp with
+key_validation_class = UTF8Type and
+default_validation_class = LexicalUUIDType and
+comparator = UTF8Type and
+column_metadata =
+[
+    {column_name: name, validation_class: UTF8Type, index_type: KEYS},
+    {column_name: vote_type, validation_class: UTF8Type},
+    {column_name: rating, validation_class: IntegerType},
+    {column_name: score, validation_class: LongType},
+    {column_name: percent, validation_class: FloatType},
+    {column_name: atomic_weight, validation_class: DoubleType},
+];
+
+set SomeApp['foo']['name'] = 'User Foo';
+set SomeApp['foo']['vote_type'] = 'like';
+set SomeApp['foo']['rating'] = 8;
+set SomeApp['foo']['score'] = 125000;
+set SomeApp['foo']['percent'] = '85.0';
+set SomeApp['foo']['atomic_weight'] = '2.7182818284590451';
+
+set SomeApp['bar']['name'] = 'User Bar';
+set SomeApp['bar']['vote_type'] = 'like';
+set SomeApp['bar']['rating'] = 9;
+set SomeApp['bar']['score'] = 15000;
+set SomeApp['bar']['percent'] = '35.0';
+set SomeApp['bar']['atomic_weight'] = '3.1415926535897931';
+
+set SomeApp['baz']['name'] = 'User Baz';
+set SomeApp['baz']['vote_type'] = 'dislike';
+set SomeApp['baz']['rating'] = 3;
+set SomeApp['baz']['score'] = 512000;
+set SomeApp['baz']['percent'] = '95.3';
+set SomeApp['baz']['atomic_weight'] = '1.61803399';
+set SomeApp['baz']['extra1'] = lexicaluuid();
+set SomeApp['baz']['extra2'] = lexicaluuid();
+set SomeApp['baz']['extra3'] = lexicaluuid();
+
+set SomeApp['qux']['name'] = 'User Qux';
+set SomeApp['qux']['vote_type'] = 'dislike';
+set SomeApp['qux']['rating'] = 2;
+set SomeApp['qux']['score'] = 12000;
+set SomeApp['qux']['percent'] = '64.7';
+set SomeApp['qux']['atomic_weight'] = '0.660161815846869';
+set SomeApp['qux']['extra1'] = lexicaluuid();
+set SomeApp['qux']['extra2'] = lexicaluuid();
+set SomeApp['qux']['extra3'] = lexicaluuid();
+set SomeApp['qux']['extra4'] = lexicaluuid();
+set SomeApp['qux']['extra5'] = lexicaluuid();
+set SomeApp['qux']['extra6'] = lexicaluuid();
+set SomeApp['qux']['extra7'] = lexicaluuid();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48b29a6/contrib/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/contrib/pig/test/test_storage.pig b/contrib/pig/test/test_storage.pig
new file mode 100644
index 0000000..22143dc
--- /dev/null
+++ b/contrib/pig/test/test_storage.pig
@@ -0,0 +1,22 @@
+rows = LOAD 'cassandra://PigTest/SomeApp' USING CassandraStorage();
+-- full copy
+STORE rows INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+-- single tuple
+onecol = FOREACH rows GENERATE key, percent;
+STORE onecol INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+-- bag only
+other = FOREACH rows GENERATE key, columns;
+STORE other INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+
+
+-- filter
+likes = FILTER rows by vote_type.value eq 'like' and rating.value > 5;
+dislikes_extras = FILTER rows by vote_type.value eq 'dislike' AND COUNT(columns) > 0;
+
+-- store these too
+STORE likes INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+STORE dislikes_extras INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
+
+-- filter to fully visible rows (no uuid columns) and dump
+visible = FILTER rows BY COUNT(columns) == 0;
+dump visible;