You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/12 21:22:07 UTC

svn commit: r984921 - in /cassandra/trunk: src/ src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/hadoop/

Author: jbellis
Date: Thu Aug 12 19:22:07 2010
New Revision: 984921

URL: http://svn.apache.org/viewvc?rev=984921&view=rev
Log:
Revert "use Avro objects in ColumnFamilyOutputFormat."

Removed:
    cassandra/trunk/src/cassandra.avpr
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
    cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Thu Aug 12 19:22:07 2010
@@ -25,10 +25,10 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.nio.ByteBuffer;
 
 import org.apache.cassandra.auth.SimpleAuthenticator;
-import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.thrift.AuthenticationException;
 import org.apache.cassandra.thrift.AuthenticationRequest;
 import org.apache.cassandra.thrift.AuthorizationException;
@@ -64,11 +64,11 @@ import org.slf4j.LoggerFactory;
  * <p>
  * For the sake of performance, this class employs a lazy write-back caching
  * mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map), and periodically makes the changes
- * official by sending a batch mutate request to Cassandra.
+ * reduce's inputs (in a task-specific map). When the writer is closed, then it
+ * makes the changes official by sending a batch mutate request to Cassandra.
  * </p>
  */
-public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+public class ColumnFamilyOutputFormat extends OutputFormat<byte[],List<IColumn>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
     
@@ -93,7 +93,15 @@ public class ColumnFamilyOutputFormat ex
     }
     
     /**
-     * The OutputCommitter for this format does not write any data to the DFS.
+     * Get the output committer for this output format. This is responsible for
+     * ensuring the output is committed correctly.
+     * 
+     * <p>
+     * This output format employs a lazy write-back caching mechanism, where the
+     * {@link RecordWriter} is responsible for collecting mutations in the
+     * {@link #MUTATIONS_CACHE}, and the {@link OutputCommitter} makes the
+     * changes official by making the change request to Cassandra.
+     * </p>
      * 
      * @param context
      *            the task context
@@ -110,13 +118,19 @@ public class ColumnFamilyOutputFormat ex
     /**
      * Get the {@link RecordWriter} for the given task.
      * 
+     * <p>
+     * As stated above, this {@link RecordWriter} merely batches the mutations
+     * that it defines in the {@link #MUTATIONS_CACHE}. In other words, it
+     * doesn't literally cause any changes on the Cassandra server.
+     * </p>
+     * 
      * @param context
      *            the information about the current task.
      * @return a {@link RecordWriter} to write the output for the job.
      * @throws IOException
      */
     @Override
-    public RecordWriter<ByteBuffer,List<Mutation>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+    public RecordWriter<byte[],List<IColumn>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordWriter(context);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Thu Aug 12 19:22:07 2010
@@ -22,12 +22,10 @@ package org.apache.cassandra.hadoop;
  */
 import java.io.IOException;
 import java.net.InetAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -35,7 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.cassandra.client.RingCache;
-import static org.apache.cassandra.io.SerDeUtils.copy;
+import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Clock;
 import org.apache.cassandra.thrift.Column;
@@ -45,8 +43,6 @@ import org.apache.cassandra.thrift.Delet
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.thrift.SuperColumn;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -54,28 +50,31 @@ import org.apache.thrift.transport.TSock
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
- * pairs to a Cassandra column family. In particular, it applies all mutations
- * in the value, which it associates with the key, and in turn the responsible
- * endpoint.
+ * pairs to a Cassandra column family. In particular, it creates mutations for
+ * each column in the value, which it associates with the key, and in turn the
+ * responsible endpoint.
  * 
  * <p>
  * Note that, given that round trips to the server are fairly expensive, it
- * merely batches the mutations in-memory and periodically sends the batched
- * mutations to the server in one shot.
+ * merely batches the mutations in-memory (specifically in
+ * {@link ColumnFamilyOutputFormat#MUTATIONS_CACHE}), and leaves it to the
+ * {@link ColumnFamilyOutputCommitter} to send the batched mutations to the
+ * server in one shot.
  * </p>
  * 
  * <p>
  * Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the mutations to be executed in parallel,
- * directly to a responsible endpoint.
+ * the rows being affected. This allows the {@link ColumnFamilyOutputCommitter}
+ * to execute the mutations in parallel, on a endpoint-by-endpoint basis.
  * </p>
  * 
  * @author Karthick Sankarachary
+ * @see ColumnFamilyOutputCommitter
  * @see ColumnFamilyOutputFormat
  * @see OutputFormat
  * 
  */
-final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
+final class ColumnFamilyRecordWriter extends RecordWriter<byte[],List<IColumn>>
 {
     // The task attempt context this writer is associated with.
     private final TaskAttemptContext context;
@@ -88,12 +87,12 @@ final class ColumnFamilyRecordWriter ext
     // the endpoints they should be targeted at. The targeted endpoint
     // essentially
     // acts as the primary replica for the rows being affected by the mutations.
-    private final RingCache ringCache;
+    private RingCache ringCache;
     
     // The number of mutations currently held in the mutations cache.
     private long batchSize = 0L;
     // The maximum number of mutations to hold in the mutations cache.
-    private final long batchThreshold;
+    private long batchThreshold = Long.MAX_VALUE;
     
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -145,92 +144,75 @@ final class ColumnFamilyRecordWriter ext
      * @throws IOException
      */
     @Override
-    public synchronized void write(ByteBuffer keybuff, List<org.apache.cassandra.avro.Mutation> value) throws IOException, InterruptedException
+    public synchronized void write(byte[] key, List<IColumn> value) throws IOException, InterruptedException
     {
         maybeFlush();
-        byte[] key = copy(keybuff);
         InetAddress endpoint = getEndpoint(key);
         Map<byte[], Map<String, List<Mutation>>> mutationsByKey = mutationsByEndpoint.get(endpoint);
         if (mutationsByKey == null)
         {
-            mutationsByKey = new TreeMap<byte[], Map<String, List<Mutation>>>(FBUtilities.byteArrayComparator);
+            mutationsByKey = new HashMap<byte[], Map<String, List<Mutation>>>();
             mutationsByEndpoint.put(endpoint, mutationsByKey);
         }
 
         Map<String, List<Mutation>> cfMutation = new HashMap<String, List<Mutation>>();
         mutationsByKey.put(key, cfMutation);
 
+        Clock clock = new Clock(System.currentTimeMillis());
         List<Mutation> mutationList = new ArrayList<Mutation>();
         cfMutation.put(ConfigHelper.getOutputColumnFamily(context.getConfiguration()), mutationList);
 
-        for (org.apache.cassandra.avro.Mutation amut : value)
-            mutationList.add(avroToThrift(amut));
-    }
-
-    /**
-     * Deep copies the given Avro mutation into a new Thrift mutation.
-     */
-    private Mutation avroToThrift(org.apache.cassandra.avro.Mutation amut)
-    {
-        Mutation mutation = new Mutation();
-        org.apache.cassandra.avro.ColumnOrSuperColumn acosc = amut.column_or_supercolumn;
-        if (acosc != null)
+        if (value == null)
         {
-            // creation
-            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
-            mutation.setColumn_or_supercolumn(cosc);
-            if (acosc.column != null)
-                // standard column
-                cosc.setColumn(avroToThrift(acosc.column));
-            else
-            {
-                // super column
-                byte[] scolname = copy(acosc.super_column.name);
-                List<Column> scolcols = new ArrayList<Column>((int)acosc.super_column.columns.size());
-                for (org.apache.cassandra.avro.Column acol : acosc.super_column.columns)
-                    scolcols.add(avroToThrift(acol));
-                cosc.setSuper_column(new SuperColumn(scolname, scolcols));
-            }
+            Mutation mutation = new Mutation();
+            Deletion deletion = new Deletion(clock);
+            mutation.setDeletion(deletion);
+            mutationList.add(mutation);
         }
         else
         {
-            // deletion
-            Deletion deletion = new Deletion(avroToThrift(amut.deletion.clock));
-            mutation.setDeletion(deletion);
-            org.apache.cassandra.avro.SlicePredicate apred = amut.deletion.predicate;
-            if (amut.deletion.super_column != null)
-                // super column
-                deletion.setSuper_column(copy(amut.deletion.super_column));
-            else if (apred.column_names != null)
-            {
-                // column names
-                List<byte[]> colnames = new ArrayList<byte[]>((int)apred.column_names.size());
-                for (ByteBuffer acolname : apred.column_names)
-                    colnames.add(copy(acolname));
-                deletion.setPredicate(new SlicePredicate().setColumn_names(colnames));
-            }
-            else
+            List<byte[]> columnsToDelete = new ArrayList<byte[]>();
+            for (IColumn column : value)
             {
-                // range
-                deletion.setPredicate(new SlicePredicate().setSlice_range(avroToThrift(apred.slice_range)));
+                Mutation mutation = new Mutation();
+                if (column.value() == null)
+                {
+                    if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+                    {
+                        if (column.name() == null)
+                            columnsToDelete.clear();
+                        columnsToDelete.add(column.name());
+                    }
+                }
+                else
+                {
+
+                    ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+                    cosc.setColumn(new Column(column.name(), column.value(), clock));
+                    mutation.setColumn_or_supercolumn(cosc);
+                }
+                mutationList.add(mutation);
             }
-        }
-        return mutation;
-    }
 
-    private SliceRange avroToThrift(org.apache.cassandra.avro.SliceRange asr)
-    {
-        return new SliceRange(copy(asr.start), copy(asr.finish), asr.reversed, asr.count);
-    }
+            if (columnsToDelete.size() > 0)
+            {
+                Mutation mutation = new Mutation();
+                Deletion deletion = new Deletion(clock);
 
-    private Column avroToThrift(org.apache.cassandra.avro.Column acol)
-    {
-        return new Column(copy(acol.name), copy(acol.value), avroToThrift(acol.clock));
-    }
+                if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+                {
+                    deletion.setPredicate(new SlicePredicate().setColumn_names(columnsToDelete));
+                }
+                else
+                {
+                    SliceRange range = new SliceRange(new byte[]{ }, new byte[]{ }, false, Integer.MAX_VALUE);
+                    deletion.setPredicate(new SlicePredicate().setSlice_range(range));
+                }
 
-    private Clock avroToThrift(org.apache.cassandra.avro.Clock aclo)
-    {
-        return new Clock(aclo.timestamp);
+                mutation.setDeletion(deletion);
+                mutationList.add(mutation);
+            }
+        }
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java Thu Aug 12 19:22:07 2010
@@ -46,14 +46,6 @@ public final class SerDeUtils
     // unbuffered decoders
     private final static DecoderFactory DIRECT_DECODERS = new DecoderFactory().configureDirectDecoder(true);
 
-    public static byte[] copy(ByteBuffer buff)
-    {
-        byte[] bytes = new byte[buff.remaining()];
-        buff.get(bytes);
-        buff.rewind();
-        return bytes;
-    }
-
 	/**
      * Deserializes a single object based on the given Schema.
      * @param writer writer's schema

Modified: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java?rev=984921&r1=984920&r2=984921&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java Thu Aug 12 19:22:07 2010
@@ -68,7 +68,6 @@ public class SampleColumnFamilyOutputToo
         job.setMapOutputValueClass(ColumnWritable.class);
         job.setInputFormatClass(SequenceFileInputFormat.class);
         
-        // TODO: no idea why this test is passing
         job.setReducerClass(ColumnFamilyOutputReducer.class);
         job.setOutputKeyClass(byte[].class);
         job.setOutputValueClass(SortedMap.class);
@@ -77,4 +76,4 @@ public class SampleColumnFamilyOutputToo
         job.waitForCompletion(true);
         return 0;
     }
-}
+}
\ No newline at end of file