You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2012/10/01 21:05:08 UTC

git commit: Add support for writing to multiple column families in CFOF

Updated Branches:
  refs/heads/trunk 937f15e17 -> e05a5fc12


Add support for writing to multiple column families in CFOF

Patch by Robbie Strickland; Reviewed by tjake for CASSANDRA-4208


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e05a5fc1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e05a5fc1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e05a5fc1

Branch: refs/heads/trunk
Commit: e05a5fc12648f315002c9939a2a0748d74525589
Parents: 937f15e
Author: Jake Luciani <ja...@apache.org>
Authored: Mon Oct 1 20:56:16 2012 +0200
Committer: Jake Luciani <jl...@bluemountaincapital.com>
Committed: Mon Oct 1 20:59:32 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 build.xml                                          |    2 +-
 .../apache/cassandra/hadoop/BulkOutputFormat.java  |    6 +-
 .../cassandra/hadoop/ColumnFamilyOutputFormat.java |    6 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   49 ++++++++++----
 5 files changed, 41 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 89ee88d..d3932b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
    (CASSANDRA-4706)
  * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
  * Add support for batchlog in CQL3 (CASSANDRA-4545)
+ * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
 
 1.2-beta1
  * add atomic_batch_mutate (CASSANDRA-4542, -4635)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7c288cc..749029e 100644
--- a/build.xml
+++ b/build.xml
@@ -362,7 +362,7 @@
           <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.6">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
-          <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="0.20.203.0"/>
+          <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.2"/>
           <dependency groupId="org.apache.pig" artifactId="pig" version="0.9.2"/>
           <dependency groupId="net.sf.jopt-simple" artifactId="jopt-simple" version="3.2"/>
           <dependency groupId="net.java.dev.jna" artifactId="jna" version="3.2.7"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index 7a6a1d7..f4ad9d9 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -37,10 +37,8 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
 
     private void checkOutputSpecs(Configuration conf)
     {
-        if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
-        }
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index b872082..64e080b 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -82,10 +82,8 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
 
     private void checkOutputSpecs(Configuration conf)
     {
-        if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
-        }
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
         if (ConfigHelper.getOutputPartitioner(conf) == null)
             throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
         if (ConfigHelper.getOutputInitialAddress(conf) == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 6282143..a2a461b 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -58,7 +58,7 @@ public class ConfigHelper
     private static final String OUTPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.output.keyspace.username";
     private static final String OUTPUT_KEYSPACE_PASSWD_CONFIG = "cassandra.output.keyspace.passwd";
     private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
-    private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
+    private static final String OUTPUT_COLUMNFAMILY_CONFIG = "mapreduce.output.basename"; //this must == OutputFormat.BASE_OUTPUT_NAME
     private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
     private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
     private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
@@ -117,25 +117,43 @@ public class ConfigHelper
     }
 
     /**
-     * Set the keyspace and column family for the output of this job.
+     * Set the keyspace for the output of this job.
      *
      * @param conf Job configuration you are about to run
      * @param keyspace
-     * @param columnFamily
      */
-    public static void setOutputColumnFamily(Configuration conf, String keyspace, String columnFamily)
+    public static void setOutputKeyspace(Configuration conf, String keyspace)
     {
         if (keyspace == null)
         {
             throw new UnsupportedOperationException("keyspace may not be null");
         }
-        if (columnFamily == null)
-        {
-            throw new UnsupportedOperationException("columnfamily may not be null");
-        }
 
         conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace);
-        conf.set(OUTPUT_COLUMNFAMILY_CONFIG, columnFamily);
+    }
+    
+    /**
+     * Set the column family for the input of this job.
+     *
+     * @param conf         Job configuration you are about to run
+     * @param columnFamily
+     */
+    public static void setOutputColumnFamily(Configuration conf, String columnFamily)
+    {
+    	conf.set(OUTPUT_COLUMNFAMILY_CONFIG, columnFamily);
+    }
+    
+    /**
+     * Set the column family for the input of this job.
+     *
+     * @param conf         Job configuration you are about to run
+     * @param keyspace
+     * @param columnFamily
+     */
+    public static void setOutputColumnFamily(Configuration conf, String keyspace, String columnFamily)
+    {
+    	setOutputKeyspace(conf, keyspace);
+    	setOutputColumnFamily(conf, columnFamily);
     }
 
     /**
@@ -329,15 +347,18 @@ public class ConfigHelper
     {
         return conf.get(INPUT_COLUMNFAMILY_CONFIG);
     }
-
-    public static boolean getInputIsWide(Configuration conf)
+    
+    public static String getOutputColumnFamily(Configuration conf)
     {
-        return Boolean.parseBoolean(conf.get(INPUT_WIDEROWS_CONFIG));
+    	if (conf.get(OUTPUT_COLUMNFAMILY_CONFIG) != null)
+    		return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
+    	else
+    		throw new UnsupportedOperationException("You must set the output column family using either setOutputColumnFamily or by adding a named output with MultipleOutputs");
     }
 
-    public static String getOutputColumnFamily(Configuration conf)
+    public static boolean getInputIsWide(Configuration conf)
     {
-        return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
+        return Boolean.valueOf(conf.get(INPUT_WIDEROWS_CONFIG));
     }
 
     public static String getReadConsistencyLevel(Configuration conf)