You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/02/03 01:11:22 UTC
[2/2] git commit: BulkOutputFormat infers CF/column types from first
mutation. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-3828
BulkOutputFormat infers CF/column types from first mutation.
Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-3828
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35aad40b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35aad40b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35aad40b
Branch: refs/heads/trunk
Commit: 35aad40b05442b4c827cf8d373c65954d4b36748
Parents: cba4087
Author: Brandon Williams <br...@apache.org>
Authored: Thu Feb 2 18:00:08 2012 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Feb 2 18:00:08 2012 -0600
----------------------------------------------------------------------
.../apache/cassandra/hadoop/BulkRecordWriter.java | 84 +++++++++++----
1 files changed, 65 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/35aad40b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index c8a3a4f..573ec8f 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -29,6 +29,7 @@ import java.net.UnknownHostException;
import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.dht.Range;
@@ -52,11 +53,25 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
- private final static String IS_SUPERCF = "mapreduce.output.bulkoutputformat.issuper";
private final Configuration conf;
- private boolean isSuper = false;
private SSTableSimpleUnsortedWriter writer;
private SSTableLoader loader;
+ private File outputdir;
+
+ private enum CFType
+ {
+ NORMAL,
+ SUPER,
+ }
+
+ private enum ColType
+ {
+ NORMAL,
+ COUNTER
+ }
+
+ private CFType cfType;
+ private ColType colType;
static {
DatabaseDescriptor.initDefaultsOnly(); // make sure DD doesn't load yaml
@@ -72,21 +87,8 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
this.conf = conf;
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0")));
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- File outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader
+ outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader
outputdir.mkdirs();
- this.isSuper = Boolean.valueOf(conf.get(IS_SUPERCF));
- AbstractType<?> subcomparator = null;
- if (isSuper)
- subcomparator = BytesType.instance;
- this.writer = new SSTableSimpleUnsortedWriter(
- outputdir,
- keyspace,
- ConfigHelper.getOutputColumnFamily(conf),
- BytesType.instance,
- subcomparator,
- Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
-
- this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
}
private String getOutputLocation() throws IOException
@@ -97,21 +99,65 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
return dir;
}
+ private void setTypes(Mutation mutation)
+ {
+ if (cfType == null)
+ {
+ if (mutation.getColumn_or_supercolumn().isSetSuper_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column())
+ cfType = CFType.SUPER;
+ else
+ cfType = CFType.NORMAL;
+ if (mutation.getColumn_or_supercolumn().isSetCounter_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column())
+ colType = ColType.COUNTER;
+ else
+ colType = ColType.NORMAL;
+ }
+ }
+
+ private void prepareWriter() throws IOException
+ {
+ if (writer == null)
+ {
+ AbstractType<?> subcomparator = null;
+ if (cfType == CFType.SUPER)
+ subcomparator = BytesType.instance;
+ this.writer = new SSTableSimpleUnsortedWriter(
+ outputdir,
+ ConfigHelper.getOutputKeyspace(conf),
+ ConfigHelper.getOutputColumnFamily(conf),
+ BytesType.instance,
+ subcomparator,
+ Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")));
+ this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
+ }
+ }
@Override
public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
{
+ setTypes(value.get(0));
+ prepareWriter();
writer.newRow(keybuff);
for (Mutation mut : value)
{
- if (isSuper)
+ if (cfType == CFType.SUPER)
{
writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
- writer.addColumn(column.name, column.value, column.timestamp);
+ {
+ if (colType == ColType.COUNTER)
+ writer.addCounterColumn(column.name, column.value.getLong());
+ else
+ writer.addColumn(column.name, column.value, column.timestamp);
+ }
}
else
- writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+ {
+ if (colType == ColType.COUNTER)
+ writer.addCounterColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value.getLong());
+ else
+ writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+ }
}
}