You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/12 21:13:10 UTC
svn commit: r984917 - in /cassandra/trunk: src/
src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/io/
test/unit/org/apache/cassandra/hadoop/
Author: jbellis
Date: Thu Aug 12 19:13:09 2010
New Revision: 984917
URL: http://svn.apache.org/viewvc?rev=984917&view=rev
Log:
use Avro objects in ColumnFamilyOutputFormat. patch by Stu Hood; reviewed by jbellis for CASSANDRA-1315
Added:
cassandra/trunk/src/cassandra.avpr
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
Added: cassandra/trunk/src/cassandra.avpr
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/cassandra.avpr?rev=984917&view=auto
==============================================================================
--- cassandra/trunk/src/cassandra.avpr (added)
+++ cassandra/trunk/src/cassandra.avpr Thu Aug 12 19:13:09 2010
@@ -0,0 +1,202 @@
+{
+ "protocol" : "InterNode",
+ "namespace" : "org.apache.cassandra",
+ "types" : [ {
+ "type" : "fixed",
+ "name" : "UUID",
+ "namespace" : "org.apache.cassandra.utils.avro",
+ "size" : 16
+ }, {
+ "type" : "enum",
+ "name" : "IndexType",
+ "namespace" : "org.apache.cassandra.config.avro",
+ "symbols" : [ "KEYS" ]
+ }, {
+ "type" : "record",
+ "name" : "ColumnDef",
+ "namespace" : "org.apache.cassandra.config.avro",
+ "fields" : [ {
+ "name" : "name",
+ "type" : "bytes"
+ }, {
+ "name" : "validation_class",
+ "type" : "string"
+ }, {
+ "name" : "index_type",
+ "type" : [ "org.apache.cassandra.config.avro.IndexType", "null" ]
+ }, {
+ "name" : "index_name",
+ "type" : [ "string", "null" ]
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "CfDef",
+ "namespace" : "org.apache.cassandra.config.avro",
+ "fields" : [ {
+ "name" : "keyspace",
+ "type" : "string"
+ }, {
+ "name" : "name",
+ "type" : "string"
+ }, {
+ "name" : "column_type",
+ "type" : [ "string", "null" ]
+ }, {
+ "name" : "clock_type",
+ "type" : [ "string", "null" ]
+ }, {
+ "name" : "comparator_type",
+ "type" : [ "string", "null" ]
+ }, {
+ "name" : "subcomparator_type",
+ "type" : [ "string", "null" ]
+ }, {
+ "name" : "reconciler",
+ "type" : [ "string", "null" ]
+ }, {
+ "name" : "comment",
+ "type" : [ "string", "null" ]
+ }, {
+ "name" : "row_cache_size",
+ "type" : [ "double", "null" ]
+ }, {
+ "name" : "preload_row_cache",
+ "type" : [ "boolean", "null" ]
+ }, {
+ "name" : "key_cache_size",
+ "type" : [ "double", "null" ]
+ }, {
+ "name" : "read_repair_chance",
+ "type" : [ "double", "null" ]
+ }, {
+ "name" : "gc_grace_seconds",
+ "type" : [ "int", "null" ]
+ }, {
+ "name" : "column_metadata",
+ "type" : [ {
+ "type" : "array",
+ "items" : "org.apache.cassandra.config.avro.ColumnDef"
+ }, "null" ]
+ }, {
+ "name" : "id",
+ "type" : [ "int", "null" ]
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "KsDef",
+ "namespace" : "org.apache.cassandra.config.avro",
+ "fields" : [ {
+ "name" : "name",
+ "type" : "string"
+ }, {
+ "name" : "strategy_class",
+ "type" : "string"
+ }, {
+ "name" : "strategy_options",
+ "type" : [ {
+ "type" : "map",
+ "values" : "string"
+ }, "null" ]
+ }, {
+ "name" : "replication_factor",
+ "type" : "int"
+ }, {
+ "name" : "cf_defs",
+ "type" : {
+ "type" : "array",
+ "items" : "org.apache.cassandra.config.avro.CfDef"
+ }
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "AddColumnFamily",
+ "namespace" : "org.apache.cassandra.db.migration.avro",
+ "fields" : [ {
+ "name" : "cf",
+ "type" : "org.apache.cassandra.config.avro.CfDef"
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "AddKeyspace",
+ "namespace" : "org.apache.cassandra.db.migration.avro",
+ "fields" : [ {
+ "name" : "ks",
+ "type" : "org.apache.cassandra.config.avro.KsDef"
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "DropColumnFamily",
+ "namespace" : "org.apache.cassandra.db.migration.avro",
+ "fields" : [ {
+ "name" : "ksname",
+ "type" : "string"
+ }, {
+ "name" : "cfname",
+ "type" : "string"
+ }, {
+ "name" : "block_on_deletion",
+ "type" : "boolean"
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "DropKeyspace",
+ "namespace" : "org.apache.cassandra.db.migration.avro",
+ "fields" : [ {
+ "name" : "ksname",
+ "type" : "string"
+ }, {
+ "name" : "block_on_deletion",
+ "type" : "boolean"
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "RenameColumnFamily",
+ "namespace" : "org.apache.cassandra.db.migration.avro",
+ "fields" : [ {
+ "name" : "ksname",
+ "type" : "string"
+ }, {
+ "name" : "cfid",
+ "type" : "int"
+ }, {
+ "name" : "old_cfname",
+ "type" : "string"
+ }, {
+ "name" : "new_cfname",
+ "type" : "string"
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "RenameKeyspace",
+ "namespace" : "org.apache.cassandra.db.migration.avro",
+ "fields" : [ {
+ "name" : "old_ksname",
+ "type" : "string"
+ }, {
+ "name" : "new_ksname",
+ "type" : "string"
+ } ]
+ }, {
+ "type" : "record",
+ "name" : "Migration",
+ "namespace" : "org.apache.cassandra.db.migration.avro",
+ "fields" : [ {
+ "name" : "old_version",
+ "type" : "org.apache.cassandra.utils.avro.UUID"
+ }, {
+ "name" : "new_version",
+ "type" : "org.apache.cassandra.utils.avro.UUID"
+ }, {
+ "name" : "row_mutation",
+ "type" : "bytes"
+ }, {
+ "name" : "classname",
+ "type" : "string"
+ }, {
+ "name" : "migration",
+ "type" : [ "org.apache.cassandra.db.migration.avro.AddColumnFamily", "org.apache.cassandra.db.migration.avro.DropColumnFamily", "org.apache.cassandra.db.migration.avro.RenameColumnFamily", "org.apache.cassandra.db.migration.avro.AddKeyspace", "org.apache.cassandra.db.migration.avro.DropKeyspace", "org.apache.cassandra.db.migration.avro.RenameKeyspace" ]
+ } ]
+ } ],
+ "messages" : {
+ }
+}
\ No newline at end of file
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=984917&r1=984916&r2=984917&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Thu Aug 12 19:13:09 2010
@@ -25,10 +25,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.nio.ByteBuffer;
import org.apache.cassandra.auth.SimpleAuthenticator;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.avro.Mutation;
import org.apache.cassandra.thrift.AuthenticationException;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.AuthorizationException;
@@ -64,11 +64,11 @@ import org.slf4j.LoggerFactory;
* <p>
* For the sake of performance, this class employs a lazy write-back caching
* mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map). When the writer is closed, then it
- * makes the changes official by sending a batch mutate request to Cassandra.
+ * reduce's inputs (in a task-specific map), and periodically makes the changes
+ * official by sending a batch mutate request to Cassandra.
* </p>
*/
-public class ColumnFamilyOutputFormat extends OutputFormat<byte[],List<IColumn>>
+public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
@@ -93,15 +93,7 @@ public class ColumnFamilyOutputFormat ex
}
/**
- * Get the output committer for this output format. This is responsible for
- * ensuring the output is committed correctly.
- *
- * <p>
- * This output format employs a lazy write-back caching mechanism, where the
- * {@link RecordWriter} is responsible for collecting mutations in the
- * {@link #MUTATIONS_CACHE}, and the {@link OutputCommitter} makes the
- * changes official by making the change request to Cassandra.
- * </p>
+ * The OutputCommitter for this format does not write any data to the DFS.
*
* @param context
* the task context
@@ -118,19 +110,13 @@ public class ColumnFamilyOutputFormat ex
/**
* Get the {@link RecordWriter} for the given task.
*
- * <p>
- * As stated above, this {@link RecordWriter} merely batches the mutations
- * that it defines in the {@link #MUTATIONS_CACHE}. In other words, it
- * doesn't literally cause any changes on the Cassandra server.
- * </p>
- *
* @param context
* the information about the current task.
* @return a {@link RecordWriter} to write the output for the job.
* @throws IOException
*/
@Override
- public RecordWriter<byte[],List<IColumn>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+ public RecordWriter<ByteBuffer,List<Mutation>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
{
return new ColumnFamilyRecordWriter(context);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=984917&r1=984916&r2=984917&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Thu Aug 12 19:13:09 2010
@@ -22,10 +22,12 @@ package org.apache.cassandra.hadoop;
*/
import java.io.IOException;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -33,7 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.db.IColumn;
+import static org.apache.cassandra.io.SerDeUtils.copy;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Clock;
import org.apache.cassandra.thrift.Column;
@@ -43,6 +45,8 @@ import org.apache.cassandra.thrift.Delet
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -50,31 +54,28 @@ import org.apache.thrift.transport.TSock
/**
* The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
- * pairs to a Cassandra column family. In particular, it creates mutations for
- * each column in the value, which it associates with the key, and in turn the
- * responsible endpoint.
+ * pairs to a Cassandra column family. In particular, it applies all mutations
+ * in the value, which it associates with the key, and in turn the responsible
+ * endpoint.
*
* <p>
* Note that, given that round trips to the server are fairly expensive, it
- * merely batches the mutations in-memory (specifically in
- * {@link ColumnFamilyOutputFormat#MUTATIONS_CACHE}), and leaves it to the
- * {@link ColumnFamilyOutputCommitter} to send the batched mutations to the
- * server in one shot.
+ * merely batches the mutations in-memory and periodically sends the batched
+ * mutations to the server in one shot.
* </p>
*
* <p>
* Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the {@link ColumnFamilyOutputCommitter}
- * to execute the mutations in parallel, on a endpoint-by-endpoint basis.
+ * the rows being affected. This allows the mutations to be executed in parallel,
+ * directly to a responsible endpoint.
* </p>
*
* @author Karthick Sankarachary
- * @see ColumnFamilyOutputCommitter
* @see ColumnFamilyOutputFormat
* @see OutputFormat
*
*/
-final class ColumnFamilyRecordWriter extends RecordWriter<byte[],List<IColumn>>
+final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
{
// The task attempt context this writer is associated with.
private final TaskAttemptContext context;
@@ -87,12 +88,12 @@ final class ColumnFamilyRecordWriter ext
// the endpoints they should be targeted at. The targeted endpoint
// essentially
// acts as the primary replica for the rows being affected by the mutations.
- private RingCache ringCache;
+ private final RingCache ringCache;
// The number of mutations currently held in the mutations cache.
private long batchSize = 0L;
// The maximum number of mutations to hold in the mutations cache.
- private long batchThreshold = Long.MAX_VALUE;
+ private final long batchThreshold;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -144,75 +145,92 @@ final class ColumnFamilyRecordWriter ext
* @throws IOException
*/
@Override
- public synchronized void write(byte[] key, List<IColumn> value) throws IOException, InterruptedException
+ public synchronized void write(ByteBuffer keybuff, List<org.apache.cassandra.avro.Mutation> value) throws IOException, InterruptedException
{
maybeFlush();
+ byte[] key = copy(keybuff);
InetAddress endpoint = getEndpoint(key);
Map<byte[], Map<String, List<Mutation>>> mutationsByKey = mutationsByEndpoint.get(endpoint);
if (mutationsByKey == null)
{
- mutationsByKey = new HashMap<byte[], Map<String, List<Mutation>>>();
+ mutationsByKey = new TreeMap<byte[], Map<String, List<Mutation>>>(FBUtilities.byteArrayComparator);
mutationsByEndpoint.put(endpoint, mutationsByKey);
}
Map<String, List<Mutation>> cfMutation = new HashMap<String, List<Mutation>>();
mutationsByKey.put(key, cfMutation);
- Clock clock = new Clock(System.currentTimeMillis());
List<Mutation> mutationList = new ArrayList<Mutation>();
cfMutation.put(ConfigHelper.getOutputColumnFamily(context.getConfiguration()), mutationList);
- if (value == null)
+ for (org.apache.cassandra.avro.Mutation amut : value)
+ mutationList.add(avroToThrift(amut));
+ }
+
+ /**
+ * Deep copies the given Avro mutation into a new Thrift mutation.
+ */
+ private Mutation avroToThrift(org.apache.cassandra.avro.Mutation amut)
+ {
+ Mutation mutation = new Mutation();
+ org.apache.cassandra.avro.ColumnOrSuperColumn acosc = amut.column_or_supercolumn;
+ if (acosc != null)
{
- Mutation mutation = new Mutation();
- Deletion deletion = new Deletion(clock);
- mutation.setDeletion(deletion);
- mutationList.add(mutation);
+ // creation
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ mutation.setColumn_or_supercolumn(cosc);
+ if (acosc.column != null)
+ // standard column
+ cosc.setColumn(avroToThrift(acosc.column));
+ else
+ {
+ // super column
+ byte[] scolname = copy(acosc.super_column.name);
+ List<Column> scolcols = new ArrayList<Column>((int)acosc.super_column.columns.size());
+ for (org.apache.cassandra.avro.Column acol : acosc.super_column.columns)
+ scolcols.add(avroToThrift(acol));
+ cosc.setSuper_column(new SuperColumn(scolname, scolcols));
+ }
}
else
{
- List<byte[]> columnsToDelete = new ArrayList<byte[]>();
- for (IColumn column : value)
+ // deletion
+ Deletion deletion = new Deletion(avroToThrift(amut.deletion.clock));
+ mutation.setDeletion(deletion);
+ org.apache.cassandra.avro.SlicePredicate apred = amut.deletion.predicate;
+ if (amut.deletion.super_column != null)
+ // super column
+ deletion.setSuper_column(copy(amut.deletion.super_column));
+ else if (apred.column_names != null)
{
- Mutation mutation = new Mutation();
- if (column.value() == null)
- {
- if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
- {
- if (column.name() == null)
- columnsToDelete.clear();
- columnsToDelete.add(column.name());
- }
- }
- else
- {
-
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
- cosc.setColumn(new Column(column.name(), column.value(), clock));
- mutation.setColumn_or_supercolumn(cosc);
- }
- mutationList.add(mutation);
+ // column names
+ List<byte[]> colnames = new ArrayList<byte[]>((int)apred.column_names.size());
+ for (ByteBuffer acolname : apred.column_names)
+ colnames.add(copy(acolname));
+ deletion.setPredicate(new SlicePredicate().setColumn_names(colnames));
}
-
- if (columnsToDelete.size() > 0)
+ else
{
- Mutation mutation = new Mutation();
- Deletion deletion = new Deletion(clock);
-
- if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
- {
- deletion.setPredicate(new SlicePredicate().setColumn_names(columnsToDelete));
- }
- else
- {
- SliceRange range = new SliceRange(new byte[]{ }, new byte[]{ }, false, Integer.MAX_VALUE);
- deletion.setPredicate(new SlicePredicate().setSlice_range(range));
- }
-
- mutation.setDeletion(deletion);
- mutationList.add(mutation);
+ // range
+ deletion.setPredicate(new SlicePredicate().setSlice_range(avroToThrift(apred.slice_range)));
}
}
+ return mutation;
+ }
+
+ private SliceRange avroToThrift(org.apache.cassandra.avro.SliceRange asr)
+ {
+ return new SliceRange(copy(asr.start), copy(asr.finish), asr.reversed, asr.count);
+ }
+
+ private Column avroToThrift(org.apache.cassandra.avro.Column acol)
+ {
+ return new Column(copy(acol.name), copy(acol.value), avroToThrift(acol.clock));
+ }
+
+ private Clock avroToThrift(org.apache.cassandra.avro.Clock aclo)
+ {
+ return new Clock(aclo.timestamp);
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java?rev=984917&r1=984916&r2=984917&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java Thu Aug 12 19:13:09 2010
@@ -46,6 +46,14 @@ public final class SerDeUtils
// unbuffered decoders
private final static DecoderFactory DIRECT_DECODERS = new DecoderFactory().configureDirectDecoder(true);
+ public static byte[] copy(ByteBuffer buff)
+ {
+ byte[] bytes = new byte[buff.remaining()];
+ buff.get(bytes);
+ buff.rewind();
+ return bytes;
+ }
+
/**
* Deserializes a single object based on the given Schema.
* @param writer writer's schema
Modified: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java?rev=984917&r1=984916&r2=984917&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java Thu Aug 12 19:13:09 2010
@@ -68,6 +68,7 @@ public class SampleColumnFamilyOutputToo
job.setMapOutputValueClass(ColumnWritable.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
+ // TODO: no idea why this test is passing
job.setReducerClass(ColumnFamilyOutputReducer.class);
job.setOutputKeyClass(byte[].class);
job.setOutputValueClass(SortedMap.class);
@@ -76,4 +77,4 @@ public class SampleColumnFamilyOutputToo
job.waitForCompletion(true);
return 0;
}
-}
\ No newline at end of file
+}