You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/03 01:09:15 UTC

git commit: BulkOutputFormat infers CF/column types from first mutation. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-3828

Updated Branches:
  refs/heads/cassandra-1.1 cba4087cf -> 35aad40b0


BulkOutputFormat infers CF/column types from first mutation.
Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-3828


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35aad40b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35aad40b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35aad40b

Branch: refs/heads/cassandra-1.1
Commit: 35aad40b05442b4c827cf8d373c65954d4b36748
Parents: cba4087
Author: Brandon Williams <br...@apache.org>
Authored: Thu Feb 2 18:00:08 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Feb 2 18:00:08 2012 -0600

----------------------------------------------------------------------
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |   84 +++++++++++----
 1 files changed, 65 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/35aad40b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index c8a3a4f..573ec8f 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -29,6 +29,7 @@ import java.net.UnknownHostException;
 import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.Range;
@@ -52,11 +53,25 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
     private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
     private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
-    private final static String IS_SUPERCF = "mapreduce.output.bulkoutputformat.issuper";
     private final Configuration conf;
-    private boolean isSuper = false;
     private SSTableSimpleUnsortedWriter writer;
     private SSTableLoader loader;
+    private File outputdir;
+
+    private enum CFType
+    {
+        NORMAL,
+        SUPER,
+    }
+
+    private enum ColType
+    {
+        NORMAL,
+        COUNTER
+    }
+
+    private CFType cfType;
+    private ColType colType;
 
     static {
         DatabaseDescriptor.initDefaultsOnly(); // make sure DD doesn't load yaml
@@ -72,21 +87,8 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         this.conf = conf;
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0")));
         String keyspace = ConfigHelper.getOutputKeyspace(conf);
-        File outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader
+        outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader
         outputdir.mkdirs();
-        this.isSuper = Boolean.valueOf(conf.get(IS_SUPERCF));
-        AbstractType<?> subcomparator = null;
-        if (isSuper)
-            subcomparator = BytesType.instance;
-        this.writer = new SSTableSimpleUnsortedWriter(
-                outputdir,
-                keyspace,
-                ConfigHelper.getOutputColumnFamily(conf),
-                BytesType.instance,
-                subcomparator,
-                Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
-
-        this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
     }
 
     private String getOutputLocation() throws IOException
@@ -97,21 +99,65 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         return dir;
     }
 
+    private void setTypes(Mutation mutation)
+    {
+       if (cfType == null)
+       {
+           if (mutation.getColumn_or_supercolumn().isSetSuper_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column())
+               cfType = CFType.SUPER;
+           else
+               cfType = CFType.NORMAL;
+           if (mutation.getColumn_or_supercolumn().isSetCounter_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column())
+               colType = ColType.COUNTER;
+           else
+               colType = ColType.NORMAL;
+       }
+    }
+
+    private void prepareWriter() throws IOException
+    {
+        if (writer == null)
+        {
+            AbstractType<?> subcomparator = null;
+            if (cfType == CFType.SUPER)
+                subcomparator = BytesType.instance;
+            this.writer = new SSTableSimpleUnsortedWriter(
+                    outputdir,
+                    ConfigHelper.getOutputKeyspace(conf),
+                    ConfigHelper.getOutputColumnFamily(conf),
+                    BytesType.instance,
+                    subcomparator,
+                    Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
+            this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
+        }
+    }
 
     @Override
     public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
     {
+        setTypes(value.get(0));
+        prepareWriter();
         writer.newRow(keybuff);
         for (Mutation mut : value)
         {
-            if (isSuper)
+            if (cfType == CFType.SUPER)
             {
                 writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
                 for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
-                   writer.addColumn(column.name, column.value, column.timestamp);
+                {
+                    if (colType == ColType.COUNTER)
+                        writer.addCounterColumn(column.name, column.value.getLong());
+                    else
+                        writer.addColumn(column.name, column.value, column.timestamp);
+                }
             }
             else
-                writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+            {
+                if (colType == ColType.COUNTER)
+                    writer.addCounterColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value.getLong());
+                else
+                    writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+            }
         }
     }