You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/12/01 19:02:21 UTC

cassandra git commit: Workaround for output name restriction when using MultipleOutputs with CqlBulkOutputFormat

Repository: cassandra
Updated Branches:
  refs/heads/trunk cb8bda8d4 -> 7add7ead1


Workaround for output name restriction when using MultipleOutputs with CqlBulkOutputFormat

Patch by Paul Pak, reviewed by Piotr Kołaczkowski for CASSANDRA-7827


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7add7ead
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7add7ead
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7add7ead

Branch: refs/heads/trunk
Commit: 7add7ead1884325c9c648802b66af45a258104ee
Parents: cb8bda8
Author: Brandon Williams <br...@apache.org>
Authored: Mon Dec 1 12:00:28 2014 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Dec 1 12:02:13 2014 -0600

----------------------------------------------------------------------
 .../cassandra/hadoop/cql3/CqlBulkOutputFormat.java       | 11 +++++++++++
 .../cassandra/hadoop/cql3/CqlBulkRecordWriter.java       |  7 +++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7add7ead/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index bdc9fbf..78080e2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -54,6 +54,7 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
     private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
     private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
     private static final String DELETE_SOURCE = "cassandra.output.delete.source";
+    private static final String COLUMNFAMILY_ALIAS_PREFIX = "cqlbulkoutputformat.columnfamily.alias.";
   
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -114,4 +115,14 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
     {
         return conf.getBoolean(DELETE_SOURCE, false);
     }
+    
+    public static void setColumnFamilyAlias(Configuration conf, String alias, String columnFamily)
+    {
+        conf.set(COLUMNFAMILY_ALIAS_PREFIX + alias, columnFamily);
+    }
+    
+    public static String getColumnFamilyForAlias(Configuration conf, String alias)
+    {
+        return conf.get(COLUMNFAMILY_ALIAS_PREFIX + alias);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7add7ead/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e60a240..ebae7a4 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
+
 /**
  * The <code>CqlBulkRecordWriter</code> maps the output &lt;key, value&gt;
  * pairs to a Cassandra column family. In particular, it applies the binded variables
@@ -85,6 +86,12 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
         // if anything is missing, exceptions will be thrown here, instead of on write()
         keyspace = ConfigHelper.getOutputKeyspace(conf);
         columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+        
+        // check if columnFamily is aliased
+        String aliasedCf = CqlBulkOutputFormat.getColumnFamilyForAlias(conf, columnFamily);
+        if (aliasedCf != null)
+            columnFamily = aliasedCf;
+        
         schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
         insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
         outputDir = getColumnFamilyDirectory();