You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/12 21:22:07 UTC
svn commit: r984921 - in /cassandra/trunk: src/
src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/io/
test/unit/org/apache/cassandra/hadoop/
Author: jbellis
Date: Thu Aug 12 19:22:07 2010
New Revision: 984921
URL: http://svn.apache.org/viewvc?rev=984921&view=rev
Log:
Revert "use Avro objects in ColumnFamilyOutputFormat."
Removed:
cassandra/trunk/src/cassandra.avpr
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Thu Aug 12 19:22:07 2010
@@ -25,10 +25,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
import org.apache.cassandra.auth.SimpleAuthenticator;
-import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.thrift.AuthenticationException;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.AuthorizationException;
@@ -64,11 +64,11 @@ import org.slf4j.LoggerFactory;
* <p>
* For the sake of performance, this class employs a lazy write-back caching
* mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map), and periodically makes the changes
- * official by sending a batch mutate request to Cassandra.
+ * reduce's inputs (in a task-specific map). When the writer is closed, then it
+ * makes the changes official by sending a batch mutate request to Cassandra.
* </p>
*/
-public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+public class ColumnFamilyOutputFormat extends OutputFormat<byte[],List<IColumn>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
@@ -93,7 +93,15 @@ public class ColumnFamilyOutputFormat ex
}
/**
- * The OutputCommitter for this format does not write any data to the DFS.
+ * Get the output committer for this output format. This is responsible for
+ * ensuring the output is committed correctly.
+ *
+ * <p>
+ * This output format employs a lazy write-back caching mechanism, where the
+ * {@link RecordWriter} is responsible for collecting mutations in the
+ * {@link #MUTATIONS_CACHE}, and the {@link OutputCommitter} makes the
+ * changes official by making the change request to Cassandra.
+ * </p>
*
* @param context
* the task context
@@ -110,13 +118,19 @@ public class ColumnFamilyOutputFormat ex
/**
* Get the {@link RecordWriter} for the given task.
*
+ * <p>
+ * As stated above, this {@link RecordWriter} merely batches the mutations
+ * that it defines in the {@link #MUTATIONS_CACHE}. In other words, it
+ * doesn't literally cause any changes on the Cassandra server.
+ * </p>
+ *
* @param context
* the information about the current task.
* @return a {@link RecordWriter} to write the output for the job.
* @throws IOException
*/
@Override
- public RecordWriter<ByteBuffer,List<Mutation>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+ public RecordWriter<byte[],List<IColumn>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
{
return new ColumnFamilyRecordWriter(context);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Thu Aug 12 19:22:07 2010
@@ -22,12 +22,10 @@ package org.apache.cassandra.hadoop;
*/
import java.io.IOException;
import java.net.InetAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -35,7 +33,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.cassandra.client.RingCache;
-import static org.apache.cassandra.io.SerDeUtils.copy;
+import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Clock;
import org.apache.cassandra.thrift.Column;
@@ -45,8 +43,6 @@ import org.apache.cassandra.thrift.Delet
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.thrift.SuperColumn;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -54,28 +50,31 @@ import org.apache.thrift.transport.TSock
/**
* The <code>ColumnFamilyRecordWriter</code> maps the output <key, value>
- * pairs to a Cassandra column family. In particular, it applies all mutations
- * in the value, which it associates with the key, and in turn the responsible
- * endpoint.
+ * pairs to a Cassandra column family. In particular, it creates mutations for
+ * each column in the value, which it associates with the key, and in turn the
+ * responsible endpoint.
*
* <p>
* Note that, given that round trips to the server are fairly expensive, it
- * merely batches the mutations in-memory and periodically sends the batched
- * mutations to the server in one shot.
+ * merely batches the mutations in-memory (specifically in
+ * {@link ColumnFamilyOutputFormat#MUTATIONS_CACHE}), and leaves it to the
+ * {@link ColumnFamilyOutputCommitter} to send the batched mutations to the
+ * server in one shot.
* </p>
*
* <p>
* Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the mutations to be executed in parallel,
- * directly to a responsible endpoint.
+ * the rows being affected. This allows the {@link ColumnFamilyOutputCommitter}
+ * to execute the mutations in parallel, on a endpoint-by-endpoint basis.
* </p>
*
* @author Karthick Sankarachary
+ * @see ColumnFamilyOutputCommitter
* @see ColumnFamilyOutputFormat
* @see OutputFormat
*
*/
-final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
+final class ColumnFamilyRecordWriter extends RecordWriter<byte[],List<IColumn>>
{
// The task attempt context this writer is associated with.
private final TaskAttemptContext context;
@@ -88,12 +87,12 @@ final class ColumnFamilyRecordWriter ext
// the endpoints they should be targeted at. The targeted endpoint
// essentially
// acts as the primary replica for the rows being affected by the mutations.
- private final RingCache ringCache;
+ private RingCache ringCache;
// The number of mutations currently held in the mutations cache.
private long batchSize = 0L;
// The maximum number of mutations to hold in the mutations cache.
- private final long batchThreshold;
+ private long batchThreshold = Long.MAX_VALUE;
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -145,92 +144,75 @@ final class ColumnFamilyRecordWriter ext
* @throws IOException
*/
@Override
- public synchronized void write(ByteBuffer keybuff, List<org.apache.cassandra.avro.Mutation> value) throws IOException, InterruptedException
+ public synchronized void write(byte[] key, List<IColumn> value) throws IOException, InterruptedException
{
maybeFlush();
- byte[] key = copy(keybuff);
InetAddress endpoint = getEndpoint(key);
Map<byte[], Map<String, List<Mutation>>> mutationsByKey = mutationsByEndpoint.get(endpoint);
if (mutationsByKey == null)
{
- mutationsByKey = new TreeMap<byte[], Map<String, List<Mutation>>>(FBUtilities.byteArrayComparator);
+ mutationsByKey = new HashMap<byte[], Map<String, List<Mutation>>>();
mutationsByEndpoint.put(endpoint, mutationsByKey);
}
Map<String, List<Mutation>> cfMutation = new HashMap<String, List<Mutation>>();
mutationsByKey.put(key, cfMutation);
+ Clock clock = new Clock(System.currentTimeMillis());
List<Mutation> mutationList = new ArrayList<Mutation>();
cfMutation.put(ConfigHelper.getOutputColumnFamily(context.getConfiguration()), mutationList);
- for (org.apache.cassandra.avro.Mutation amut : value)
- mutationList.add(avroToThrift(amut));
- }
-
- /**
- * Deep copies the given Avro mutation into a new Thrift mutation.
- */
- private Mutation avroToThrift(org.apache.cassandra.avro.Mutation amut)
- {
- Mutation mutation = new Mutation();
- org.apache.cassandra.avro.ColumnOrSuperColumn acosc = amut.column_or_supercolumn;
- if (acosc != null)
+ if (value == null)
{
- // creation
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
- mutation.setColumn_or_supercolumn(cosc);
- if (acosc.column != null)
- // standard column
- cosc.setColumn(avroToThrift(acosc.column));
- else
- {
- // super column
- byte[] scolname = copy(acosc.super_column.name);
- List<Column> scolcols = new ArrayList<Column>((int)acosc.super_column.columns.size());
- for (org.apache.cassandra.avro.Column acol : acosc.super_column.columns)
- scolcols.add(avroToThrift(acol));
- cosc.setSuper_column(new SuperColumn(scolname, scolcols));
- }
+ Mutation mutation = new Mutation();
+ Deletion deletion = new Deletion(clock);
+ mutation.setDeletion(deletion);
+ mutationList.add(mutation);
}
else
{
- // deletion
- Deletion deletion = new Deletion(avroToThrift(amut.deletion.clock));
- mutation.setDeletion(deletion);
- org.apache.cassandra.avro.SlicePredicate apred = amut.deletion.predicate;
- if (amut.deletion.super_column != null)
- // super column
- deletion.setSuper_column(copy(amut.deletion.super_column));
- else if (apred.column_names != null)
- {
- // column names
- List<byte[]> colnames = new ArrayList<byte[]>((int)apred.column_names.size());
- for (ByteBuffer acolname : apred.column_names)
- colnames.add(copy(acolname));
- deletion.setPredicate(new SlicePredicate().setColumn_names(colnames));
- }
- else
+ List<byte[]> columnsToDelete = new ArrayList<byte[]>();
+ for (IColumn column : value)
{
- // range
- deletion.setPredicate(new SlicePredicate().setSlice_range(avroToThrift(apred.slice_range)));
+ Mutation mutation = new Mutation();
+ if (column.value() == null)
+ {
+ if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+ {
+ if (column.name() == null)
+ columnsToDelete.clear();
+ columnsToDelete.add(column.name());
+ }
+ }
+ else
+ {
+
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ cosc.setColumn(new Column(column.name(), column.value(), clock));
+ mutation.setColumn_or_supercolumn(cosc);
+ }
+ mutationList.add(mutation);
}
- }
- return mutation;
- }
- private SliceRange avroToThrift(org.apache.cassandra.avro.SliceRange asr)
- {
- return new SliceRange(copy(asr.start), copy(asr.finish), asr.reversed, asr.count);
- }
+ if (columnsToDelete.size() > 0)
+ {
+ Mutation mutation = new Mutation();
+ Deletion deletion = new Deletion(clock);
- private Column avroToThrift(org.apache.cassandra.avro.Column acol)
- {
- return new Column(copy(acol.name), copy(acol.value), avroToThrift(acol.clock));
- }
+ if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+ {
+ deletion.setPredicate(new SlicePredicate().setColumn_names(columnsToDelete));
+ }
+ else
+ {
+ SliceRange range = new SliceRange(new byte[]{ }, new byte[]{ }, false, Integer.MAX_VALUE);
+ deletion.setPredicate(new SlicePredicate().setSlice_range(range));
+ }
- private Clock avroToThrift(org.apache.cassandra.avro.Clock aclo)
- {
- return new Clock(aclo.timestamp);
+ mutation.setDeletion(deletion);
+ mutationList.add(mutation);
+ }
+ }
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java Thu Aug 12 19:22:07 2010
@@ -46,14 +46,6 @@ public final class SerDeUtils
// unbuffered decoders
private final static DecoderFactory DIRECT_DECODERS = new DecoderFactory().configureDirectDecoder(true);
- public static byte[] copy(ByteBuffer buff)
- {
- byte[] bytes = new byte[buff.remaining()];
- buff.get(bytes);
- buff.rewind();
- return bytes;
- }
-
/**
* Deserializes a single object based on the given Schema.
* @param writer writer's schema
Modified: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java Thu Aug 12 19:22:07 2010
@@ -68,7 +68,6 @@ public class SampleColumnFamilyOutputToo
job.setMapOutputValueClass(ColumnWritable.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
- // TODO: no idea why this test is passing
job.setReducerClass(ColumnFamilyOutputReducer.class);
job.setOutputKeyClass(byte[].class);
job.setOutputValueClass(SortedMap.class);
@@ -77,4 +76,4 @@ public class SampleColumnFamilyOutputToo
job.waitForCompletion(true);
return 0;
}
-}
+}
\ No newline at end of file