You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2012/10/01 21:05:08 UTC
git commit: Add support for writing to multiple column families in
CFOF
Updated Branches:
refs/heads/trunk 937f15e17 -> e05a5fc12
Add support for writing to multiple column families in CFOF
Patch by Robbie Strickland; Reviewed by tjake for CASSANDRA-4208
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e05a5fc1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e05a5fc1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e05a5fc1
Branch: refs/heads/trunk
Commit: e05a5fc12648f315002c9939a2a0748d74525589
Parents: 937f15e
Author: Jake Luciani <ja...@apache.org>
Authored: Mon Oct 1 20:56:16 2012 +0200
Committer: Jake Luciani <jl...@bluemountaincapital.com>
Committed: Mon Oct 1 20:59:32 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +-
.../apache/cassandra/hadoop/BulkOutputFormat.java | 6 +-
.../cassandra/hadoop/ColumnFamilyOutputFormat.java | 6 +-
.../org/apache/cassandra/hadoop/ConfigHelper.java | 49 ++++++++++----
5 files changed, 41 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 89ee88d..d3932b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
(CASSANDRA-4706)
* Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
* Add support for batchlog in CQL3 (CASSANDRA-4545)
+ * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
1.2-beta1
* add atomic_batch_mutate (CASSANDRA-4542, -4635)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7c288cc..749029e 100644
--- a/build.xml
+++ b/build.xml
@@ -362,7 +362,7 @@
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.6">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
- <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="0.20.203.0"/>
+ <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.2"/>
<dependency groupId="org.apache.pig" artifactId="pig" version="0.9.2"/>
<dependency groupId="net.sf.jopt-simple" artifactId="jopt-simple" version="3.2"/>
<dependency groupId="net.java.dev.jna" artifactId="jna" version="3.2.7"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index 7a6a1d7..f4ad9d9 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -37,10 +37,8 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
private void checkOutputSpecs(Configuration conf)
{
- if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
- }
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index b872082..64e080b 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -82,10 +82,8 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
private void checkOutputSpecs(Configuration conf)
{
- if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
- {
- throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
- }
+ if (ConfigHelper.getOutputKeyspace(conf) == null)
+ throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
if (ConfigHelper.getOutputPartitioner(conf) == null)
throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
if (ConfigHelper.getOutputInitialAddress(conf) == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 6282143..a2a461b 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -58,7 +58,7 @@ public class ConfigHelper
private static final String OUTPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.output.keyspace.username";
private static final String OUTPUT_KEYSPACE_PASSWD_CONFIG = "cassandra.output.keyspace.passwd";
private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
- private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
+ private static final String OUTPUT_COLUMNFAMILY_CONFIG = "mapreduce.output.basename"; //this must == OutputFormat.BASE_OUTPUT_NAME
private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
@@ -117,25 +117,43 @@ public class ConfigHelper
}
/**
- * Set the keyspace and column family for the output of this job.
+ * Set the keyspace for the output of this job.
*
* @param conf Job configuration you are about to run
* @param keyspace
- * @param columnFamily
*/
- public static void setOutputColumnFamily(Configuration conf, String keyspace, String columnFamily)
+ public static void setOutputKeyspace(Configuration conf, String keyspace)
{
if (keyspace == null)
{
throw new UnsupportedOperationException("keyspace may not be null");
}
- if (columnFamily == null)
- {
- throw new UnsupportedOperationException("columnfamily may not be null");
- }
conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace);
- conf.set(OUTPUT_COLUMNFAMILY_CONFIG, columnFamily);
+ }
+
+ /**
+ * Set the column family for the input of this job.
+ *
+ * @param conf Job configuration you are about to run
+ * @param columnFamily
+ */
+ public static void setOutputColumnFamily(Configuration conf, String columnFamily)
+ {
+ conf.set(OUTPUT_COLUMNFAMILY_CONFIG, columnFamily);
+ }
+
+ /**
+ * Set the column family for the input of this job.
+ *
+ * @param conf Job configuration you are about to run
+ * @param keyspace
+ * @param columnFamily
+ */
+ public static void setOutputColumnFamily(Configuration conf, String keyspace, String columnFamily)
+ {
+ setOutputKeyspace(conf, keyspace);
+ setOutputColumnFamily(conf, columnFamily);
}
/**
@@ -329,15 +347,18 @@ public class ConfigHelper
{
return conf.get(INPUT_COLUMNFAMILY_CONFIG);
}
-
- public static boolean getInputIsWide(Configuration conf)
+
+ public static String getOutputColumnFamily(Configuration conf)
{
- return Boolean.parseBoolean(conf.get(INPUT_WIDEROWS_CONFIG));
+ if (conf.get(OUTPUT_COLUMNFAMILY_CONFIG) != null)
+ return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
+ else
+ throw new UnsupportedOperationException("You must set the output column family using either setOutputColumnFamily or by adding a named output with MultipleOutputs");
}
- public static String getOutputColumnFamily(Configuration conf)
+ public static boolean getInputIsWide(Configuration conf)
{
- return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
+ return Boolean.valueOf(conf.get(INPUT_WIDEROWS_CONFIG));
}
public static String getReadConsistencyLevel(Configuration conf)