You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/03 00:20:55 UTC

[3/7] git commit: Allow overriding default input/outputformats Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3826

Allow overriding default input/outputformats
Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3826


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7f1e7a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7f1e7a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7f1e7a6

Branch: refs/heads/trunk
Commit: a7f1e7a62d882a97fe15cbd8eee94e01333cddef
Parents: 0d768e1
Author: Brandon Williams <br...@apache.org>
Authored: Thu Feb 2 11:44:03 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Feb 2 13:06:12 2012 -0600

----------------------------------------------------------------------
 .../cassandra/hadoop/pig/CassandraStorage.java     |   42 +++++++++++++--
 1 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f1e7a6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index eec516f..dc6ec90 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -71,7 +72,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
 
+    private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
+    private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
+    
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
 
@@ -86,6 +92,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private Configuration conf;
     private RecordReader reader;
     private RecordWriter writer;
+    private String inputFormatClass;
+    private String outputFormatClass;
     private int limit;
 
     public CassandraStorage()
@@ -247,7 +255,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     @Override
     public InputFormat getInputFormat()
     {
-        return new ColumnFamilyInputFormat();
+        try
+        {
+            return FBUtilities.construct(inputFormatClass, "inputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
@@ -329,12 +344,24 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         {
             ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
             ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
-    }
+        }
         if(System.getenv(PIG_INPUT_PARTITIONER) != null)
             ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
         if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
             ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
-
+        if (System.getenv(PIG_INPUT_FORMAT) != null)
+            inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
+        else
+            inputFormatClass = DEFAULT_INPUT_FORMAT;
+        if (System.getenv(PIG_OUTPUT_FORMAT) != null)
+            outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
+        else
+            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+    }
+    
+    private String getFullyQualifiedClassName(String classname)
+    {
+        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
     }
 
     @Override
@@ -505,7 +532,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
 
     public OutputFormat getOutputFormat()
     {
-        return new ColumnFamilyOutputFormat();
+        try
+        {
+            return FBUtilities.construct(outputFormatClass, "outputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public void checkSchema(ResourceSchema schema) throws IOException