You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/03 00:20:55 UTC
[3/7] git commit: Allow overriding default input/outputformats Patch
by brandonwilliams, reviewed by xedin for CASSANDRA-3826
Allow overriding default input/outputformats
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3826
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7f1e7a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7f1e7a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7f1e7a6
Branch: refs/heads/trunk
Commit: a7f1e7a62d882a97fe15cbd8eee94e01333cddef
Parents: 0d768e1
Author: Brandon Williams <br...@apache.org>
Authored: Thu Feb 2 11:44:03 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Feb 2 13:06:12 2012 -0600
----------------------------------------------------------------------
.../cassandra/hadoop/pig/CassandraStorage.java | 42 +++++++++++++--
1 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f1e7a6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index eec516f..dc6ec90 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -71,7 +72,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+ public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+ public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+ private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
+ private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
+
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -86,6 +92,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
private Configuration conf;
private RecordReader reader;
private RecordWriter writer;
+ private String inputFormatClass;
+ private String outputFormatClass;
private int limit;
public CassandraStorage()
@@ -247,7 +255,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
@Override
public InputFormat getInputFormat()
{
- return new ColumnFamilyInputFormat();
+ try
+ {
+ return FBUtilities.construct(inputFormatClass, "inputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -329,12 +344,24 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
{
ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
- }
+ }
if(System.getenv(PIG_INPUT_PARTITIONER) != null)
ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
-
+ if (System.getenv(PIG_INPUT_FORMAT) != null)
+ inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
+ else
+ inputFormatClass = DEFAULT_INPUT_FORMAT;
+ if (System.getenv(PIG_OUTPUT_FORMAT) != null)
+ outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
+ else
+ outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+ }
+
+ private String getFullyQualifiedClassName(String classname)
+ {
+ return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
}
@Override
@@ -505,7 +532,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
public OutputFormat getOutputFormat()
{
- return new ColumnFamilyOutputFormat();
+ try
+ {
+ return FBUtilities.construct(outputFormatClass, "outputformat");
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public void checkSchema(ResourceSchema schema) throws IOException