You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/21 15:14:28 UTC
[5/6] git commit: Pig: require deletes to be explicitly enabled.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3628
Pig: require deletes to be explicitly enabled.
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3628
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d6c1bda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d6c1bda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d6c1bda
Branch: refs/heads/trunk
Commit: 5d6c1bdad7107618fc165bec74fc0444078910f8
Parents: 161e052
Author: Brandon Williams <br...@apache.org>
Authored: Tue Feb 14 12:21:47 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Feb 17 05:17:01 2012 -0600
----------------------------------------------------------------------
.../cassandra/hadoop/pig/CassandraStorage.java | 31 ++++++++++----
1 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d6c1bda/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 7e55ee0..8863975 100644
--- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -67,6 +67,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+ public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -74,6 +75,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private ByteBuffer slice_start = BOUND;
private ByteBuffer slice_end = BOUND;
private boolean slice_reverse = false;
+ private boolean allow_deletes = false;
private String keyspace;
private String column_family;
private String loadSignature;
@@ -331,6 +333,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
else if (ConfigHelper.getPartitioner(conf) == null)
throw new IOException("PIG_PARTITIONER environment variable not set");
+ if (System.getenv(PIG_ALLOW_DELETES) != null)
+ allow_deletes = Boolean.valueOf(System.getenv(PIG_ALLOW_DELETES));
}
@Override
@@ -585,11 +589,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
Mutation mutation = new Mutation();
if (t.get(1) == null)
{
- // TODO: optional deletion
- mutation.deletion = new Deletion();
- mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
- mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
- mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+ if (allow_deletes)
+ {
+ mutation.deletion = new Deletion();
+ mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
+ mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
+ mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+ }
+ else
+ throw new IOException("null found but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable");
}
else
{
@@ -622,11 +630,16 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
column.setTimestamp(FBUtilities.timestampMicros());
columns.add(column);
}
- if (columns.isEmpty()) // TODO: optional deletion
+ if (columns.isEmpty())
{
- mutation.deletion = new Deletion();
- mutation.deletion.super_column = objToBB(pair.get(0));
- mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+ if (allow_deletes)
+ {
+ mutation.deletion = new Deletion();
+ mutation.deletion.super_column = objToBB(pair.get(0));
+ mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
+ }
+ else
+ throw new IOException("SuperColumn deletion attempted with empty bag, but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable");
}
else
{