You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/21 15:14:14 UTC

[4/5] git commit: Pig: require deletes to be explicitly enabled. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3628

Pig: require deletes to be explicitly enabled.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3628


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d6c1bda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d6c1bda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d6c1bda

Branch: refs/heads/cassandra-1.1
Commit: 5d6c1bdad7107618fc165bec74fc0444078910f8
Parents: 161e052
Author: Brandon Williams <br...@apache.org>
Authored: Tue Feb 14 12:21:47 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Feb 17 05:17:01 2012 -0600

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |   31 ++++++++++----
 1 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d6c1bda/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 7e55ee0..8863975 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -67,6 +67,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+    public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -74,6 +75,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private ByteBuffer slice_start = BOUND;
     private ByteBuffer slice_end = BOUND;
     private boolean slice_reverse = false;
+    private boolean allow_deletes = false;
     private String keyspace;
     private String column_family;
     private String loadSignature;
@@ -331,6 +333,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
         else if (ConfigHelper.getPartitioner(conf) == null) 
             throw new IOException("PIG_PARTITIONER environment variable not set");
+        if (System.getenv(PIG_ALLOW_DELETES) != null)
+            allow_deletes = Boolean.valueOf(System.getenv(PIG_ALLOW_DELETES));
     }
 
     @Override
@@ -585,11 +589,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         Mutation mutation = new Mutation();
         if (t.get(1) == null)
         {
-            // TODO: optional deletion
-            mutation.deletion = new Deletion();
-            mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
-            mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
-            mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+            if (allow_deletes)
+            {
+                mutation.deletion = new Deletion();
+                mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
+                mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
+                mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+            }
+            else
+                throw new IOException("null found but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable");
         }
         else
         {
@@ -622,11 +630,16 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
                     column.setTimestamp(FBUtilities.timestampMicros());
                     columns.add(column);
                 }
-                if (columns.isEmpty()) // TODO: optional deletion
+                if (columns.isEmpty())
                 {
-                    mutation.deletion = new Deletion();
-                    mutation.deletion.super_column = objToBB(pair.get(0));
-                    mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+                    if (allow_deletes)
+                    {
+                        mutation.deletion = new Deletion();
+                        mutation.deletion.super_column = objToBB(pair.get(0));
+                        mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+                    }
+                    else
+                        throw new IOException("SuperColumn deletion attempted with empty bag, but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable");
                 }
                 else
                 {