You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/16 20:56:57 UTC

svn commit: r955347 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/hadoop/ test/unit/org/apache/cassandra/ test/unit/org/apache/cassandra/hadoop/

Author: jbellis
Date: Wed Jun 16 18:56:57 2010
New Revision: 955347

URL: http://svn.apache.org/viewvc?rev=955347&view=rev
Log:
hadoop outputformat.  patch by Karthick Sankarachary; reviewed by Stu Hood and jbellis for CASSSANDRA-1101

Added:
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java
    cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java
    cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
    cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/ivy.xml
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jun 16 18:56:57 2010
@@ -28,6 +28,7 @@ dev
    entries spanning segment boundaries, with SegmentedFile that computes 
    segments that always contain entire entries/rows (CASSANDRA-1117)
  * avoid reading large rows into memory during compaction (CASSANDRA-16)
+ * added hadoop OutputFormat (CASSANDRA-1101)
 
 
 0.6.3

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Jun 16 18:56:57 2010
@@ -14,6 +14,7 @@ Features
       ConsitencyLevel.DCQUORUM and DCQUORUMSYNC.  See comments in
       `cassandra.yaml.`
     - row size limit increased from 2GB to 2 billion columns
+    - Hadoop OutputFormat support
 
 Configuraton
 ------------

Modified: cassandra/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/ivy.xml?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/ivy.xml (original)
+++ cassandra/trunk/ivy.xml Wed Jun 16 18:56:57 2010
@@ -25,6 +25,7 @@
     <dependency org="com.thoughtworks.paranamer"
                 name="paranamer-ant" rev="2.1"/>
     <dependency org="junit" name="junit" rev="4.6" />
+    <dependency org="commons-logging" name="commons-logging" rev="1.1.1"/>
     <dependency org="org.apache.rat" name="apache-rat" rev="0.6" />
     <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"/>
   </dependencies>

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=955347&r1=955346&r2=955347&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Wed Jun 16 18:56:57 2010
@@ -62,7 +62,6 @@ import org.apache.thrift.transport.TTran
  */
 public class ColumnFamilyInputFormat extends InputFormat<byte[], SortedMap<byte[], IColumn>>
 {
-
     private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
     
     private int splitsize;

Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,252 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.auth.AllowAllAuthenticator;
+import org.apache.cassandra.auth.SimpleAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+
+/**
+ * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * OutputFormat that allows reduce tasks to store keys (and corresponding
+ * values) as Cassandra rows (and respective columns) in a given
+ * {@link ColumnFamily}.
+ * 
+ * <p>
+ * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
+ * CF and predicate (description of columns to extract from each row) in your
+ * Hadoop job Configuration. The {@link ConfigHelper} class, through its
+ * {@link ConfigHelper#setColumnFamily} and
+ * {@link ConfigHelper#setSlicePredicate} methods, is provided to make this
+ * simple.
+ * </p>
+ * 
+ * <p>
+ * By default, it prevents overwriting existing rows in the column family, by
+ * ensuring at initialization time that it contains no rows in the given slice
+ * predicate. For the sake of performance, it employs a lazy write-back caching
+ * mechanism, where its record writer batches mutations created based on the
+ * reduce's inputs (in a task-specific map). When the writer is closed, then it
+ * makes the changes official by sending a batch mutate request to Cassandra.
+ * </p>
+ * 
+ * @author Karthick Sankarachary
+ */
+public class ColumnFamilyOutputFormat extends OutputFormat<byte[],List<IColumn>>
+{
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
+    
+    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+    
+    /**
+     * Check for validity of the output-specification for the job.
+     * 
+     * <p>
+     * This is to validate the output specification for the job when it is a job
+     * is submitted. By default, it will prevent writes to the given column
+     * family, if it already contains one or more rows in the given slice
+     * predicate. If you wish to relax that restriction, you may override this
+     * method is a sub-class of your choosing.
+     * </p>
+     * 
+     * @param context
+     *            information about the job
+     * @throws IOException
+     *             when output should not be attempted
+     */
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
+    {
+        validateConfiguration(context.getConfiguration());
+        String keyspace = ConfigHelper.getKeyspace(context.getConfiguration());
+        String columnFamily = ConfigHelper.getColumnFamily(context.getConfiguration());
+        SlicePredicate slicePredicate = ConfigHelper.getSlicePredicate(context.getConfiguration());
+        assert slicePredicate != null;
+        if (slicePredicate.column_names == null && slicePredicate.slice_range == null)
+            slicePredicate = slicePredicate.setColumn_names(new ArrayList<byte[]>());
+
+        List<KeySlice> keySlices;
+        try
+        {
+            TSocket socket = new TSocket(DatabaseDescriptor.getListenAddress().getHostName(), DatabaseDescriptor.getRpcPort());
+            Cassandra.Client client = createAuthenticatedClient(socket, context);
+            ColumnParent parent = new ColumnParent().setColumn_family(columnFamily);
+            KeyRange range = new KeyRange().setStart_key("".getBytes()).setEnd_key("".getBytes());
+            keySlices = client.get_range_slices(parent, slicePredicate, range, ConsistencyLevel.ONE);
+        }
+        catch (Exception e)
+        {
+            throw new IOException(e);
+        }
+        if (keySlices.size() > 0)
+        {
+            throw new IOException("The column family " + columnFamily
+                                  + " in the keyspace " + keyspace + " already has "
+                                  + keySlices.size() + " keys in the slice predicate "
+                                  + slicePredicate);
+        }
+    }
+    
+    /**
+     * Get the output committer for this output format. This is responsible for
+     * ensuring the output is committed correctly.
+     * 
+     * <p>
+     * This output format employs a lazy write-back caching mechanism, where the
+     * {@link RecordWriter} is responsible for collecting mutations in the
+     * {@link #MUTATIONS_CACHE}, and the {@link OutputCommitter} makes the
+     * changes official by making the change request to Cassandra.
+     * </p>
+     * 
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+    
+    /**
+     * Get the {@link RecordWriter} for the given task.
+     * 
+     * <p>
+     * As stated above, this {@link RecordWriter} merely batches the mutations
+     * that it defines in the {@link #MUTATIONS_CACHE}. In other words, it
+     * doesn't literally cause any changes on the Cassandra server.
+     * </p>
+     * 
+     * @param context
+     *            the information about the current task.
+     * @return a {@link RecordWriter} to write the output for the job.
+     * @throws IOException
+     */
+    @Override
+    public RecordWriter<byte[],List<IColumn>> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new ColumnFamilyRecordWriter(context);
+    }
+    
+    /**
+     * Ensure that this output format has been configured correctly, with a
+     * valid keyspace, column family and slice predicate.
+     * 
+     * @param conf
+     */
+    public void validateConfiguration(Configuration conf)
+    {
+        if (ConfigHelper.getKeyspace(conf) == null || ConfigHelper.getColumnFamily(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
+        }
+        if (ConfigHelper.getSlicePredicate(conf) == null)
+        {
+            System.out.println("Since no slice predicate was specified, all columns in "
+                               + ConfigHelper.getColumnFamily(conf)
+                               + " will be overwritten");
+        }
+    }
+
+    /**
+     * Return a client based on the given socket that points to the configured
+     * keyspace, and is logged in with the configured credentials.
+     *
+     * @param socket  a socket pointing to a particular node, seed or otherwise
+     * @param context a job context
+     * @return a cassandra client
+     * @throws InvalidRequestException
+     * @throws TException
+     * @throws AuthenticationException
+     * @throws AuthorizationException
+     */
+    public static Cassandra.Client createAuthenticatedClient(TSocket socket, JobContext context)
+    throws InvalidRequestException, TException, AuthenticationException, AuthorizationException
+    {
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        socket.open();
+        client.set_keyspace(ConfigHelper.getKeyspace(context.getConfiguration()));
+        Map<String, String> creds = new HashMap<String, String>();
+        creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getKeyspaceUserName(context.getConfiguration()));
+        creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getKeyspacePassword(context.getConfiguration()));
+        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+        if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
+            client.login(authRequest);
+        return client;
+
+    }
+
+    /**
+     * An {@link OutputCommitter} that does nothing.
+     */
+    public class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputReducer.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,65 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IColumn;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * The <code>ColumnFamilyOutputReducer</code> reduces a &lt;key, values&gt;
+ * pair, where the value is a generic iterable type, into a list of columns that
+ * need to be mutated for that key, where each column corresponds to an element
+ * in the value.
+ * 
+ * <p>
+ * The default implementation treats the VALUEIN type to be a
+ * {@link ColumnWritable}, in which case this reducer acts as an identity
+ * function.
+ * 
+ * @author Karthick Sankarachary
+ * 
+ * @param <KEYIN>
+ */
+public class ColumnFamilyOutputReducer<KEYIN, VALUEIN> extends Reducer<KEYIN, VALUEIN, byte[], List<IColumn>>
+{
+    public void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException
+    {
+        byte[] cfKey = key.toString().getBytes();
+        List<IColumn> cfColumns = new ArrayList<IColumn>();
+        for (VALUEIN value : values)
+        {
+            ColumnWritable columnValue = map(value);
+            cfColumns.add(new Column(columnValue.getName(), columnValue.getValue()));
+        }
+        context.write(cfKey, cfColumns);
+    }
+
+    protected ColumnWritable map(VALUEIN value)
+    {
+        return (ColumnWritable) value;
+    }
+}
\ No newline at end of file

Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,343 @@
+package org.apache.cassandra.hadoop;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.client.RingCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Clock;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.transport.TSocket;
+
+/**
+ * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
+ * pairs to a Cassandra column family. In particular, it creates mutations for
+ * each column in the value, which it associates with the key, and in turn the
+ * responsible endpoint.
+ * 
+ * <p>
+ * Note that, given that round trips to the server are fairly expensive, it
+ * merely batches the mutations in-memory (specifically in
+ * {@link ColumnFamilyOutputFormat#MUTATIONS_CACHE}), and leaves it to the
+ * {@link ColumnFamilyOutputCommitter} to send the batched mutations to the
+ * server in one shot.
+ * </p>
+ * 
+ * <p>
+ * Furthermore, this writer groups the mutations by the endpoint responsible for
+ * the rows being affected. This allows the {@link ColumnFamilyOutputCommitter}
+ * to execute the mutations in parallel, on a endpoint-by-endpoint basis.
+ * </p>
+ * 
+ * @author Karthick Sankarachary
+ * @see ColumnFamilyOutputCommitter
+ * @see ColumnFamilyOutputFormat
+ * @see OutputFormat
+ * 
+ */
+final class ColumnFamilyRecordWriter extends RecordWriter<byte[],List<IColumn>>
+{
+    // The task attempt context this writer is associated with.
+    private final TaskAttemptContext context;
+    
+    // The batched set of mutations grouped by endpoints.
+    private Map<InetAddress,Map<byte[],Map<String,List<Mutation>>>> mutationsByEndpoint;
+    
+    // The ring cache that describes the token ranges each node in the ring is
+    // responsible for. This is what allows us to group the mutations by
+    // the endpoints they should be targeted at. The targeted endpoint
+    // essentially
+    // acts as the primary replica for the rows being affected by the mutations.
+    private RingCache ringCache;
+    
+    // The number of mutations currently held in the mutations cache.
+    private long batchSize = 0L;
+    // The maximum number of mutations to hold in the mutations cache.
+    private long batchThreshold = Long.MAX_VALUE;
+    
+    /**
+     * Upon construction, obtain the map that this writer will use to collect
+     * mutations, and the ring cache for the given keyspace.
+     * 
+     * @param context the task attempt context
+     * @throws IOException
+     */
+    ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
+    {
+        this.context = context;
+        this.mutationsByEndpoint = new HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
+        this.ringCache = new RingCache(ConfigHelper.getKeyspace(context.getConfiguration()));
+        this.batchThreshold = context.getConfiguration().getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, Long.MAX_VALUE);
+    }
+    
+    /**
+     * Return the endpoint responsible for the given key. The selected endpoint
+     * one whose token range contains the given key.
+     * 
+     * @param key
+     *            the key being mutated
+     * @return the endpoint responsible for that key
+     */
+    protected InetAddress getEndpoint(byte[] key)
+    {
+        List<InetAddress> endpoints = ringCache.getEndpoint(key);
+        return endpoints != null && endpoints.size() > 0
+               ? endpoints.get(0)
+               : null;
+    }
+
+    /**
+     * Writes a key/value pair, not to the Cassandra server, but into a
+     * in-memory cache (viz. {@link #mutationsByEndpoint}.
+     * 
+     * <p>
+     * If the key is to be associated with a valid value, a mutation is created
+     * for it with the given column family and columns. In the event the value
+     * in the column is missing (i.e., null), then it is marked for
+     * {@link Deletion}. Similarly, if the entire value for a key is missing
+     * (i.e., null), then the entire key is marked for {@link Deletion}.
+     * </p>
+     * 
+     * @param key
+     *            the key to write.
+     * @param value
+     *            the value to write.
+     * @throws IOException
+     */
+    @Override
+    public synchronized void write(byte[] key, List<IColumn> value) throws IOException, InterruptedException
+    {
+        maybeFlush();
+        InetAddress endpoint = getEndpoint(key);
+        Map<byte[], Map<String, List<Mutation>>> mutationsByKey = mutationsByEndpoint.get(endpoint);
+        if (mutationsByKey == null)
+        {
+            mutationsByKey = new HashMap<byte[], Map<String, List<Mutation>>>();
+            mutationsByEndpoint.put(endpoint, mutationsByKey);
+        }
+
+        Map<String, List<Mutation>> cfMutation = new HashMap<String, List<Mutation>>();
+        mutationsByKey.put(key, cfMutation);
+
+        Clock clock = new Clock(System.currentTimeMillis());
+        List<Mutation> mutationList = new ArrayList<Mutation>();
+        cfMutation.put(ConfigHelper.getColumnFamily(context.getConfiguration()), mutationList);
+
+        if (value == null)
+        {
+            Mutation mutation = new Mutation();
+            Deletion deletion = new Deletion(clock);
+            mutation.setDeletion(deletion);
+            mutationList.add(mutation);
+        }
+        else
+        {
+            List<byte[]> columnsToDelete = new ArrayList<byte[]>();
+            for (IColumn column : value)
+            {
+                Mutation mutation = new Mutation();
+                if (column.value() == null)
+                {
+                    if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+                    {
+                        if (column.name() == null)
+                            columnsToDelete.clear();
+                        columnsToDelete.add(column.name());
+                    }
+                }
+                else
+                {
+
+                    ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+                    cosc.setColumn(new Column(column.name(), column.value(), clock));
+                    mutation.setColumn_or_supercolumn(cosc);
+                }
+                mutationList.add(mutation);
+            }
+
+            if (columnsToDelete.size() > 0)
+            {
+                Mutation mutation = new Mutation();
+                Deletion deletion = new Deletion(clock);
+
+                if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
+                {
+                    deletion.setPredicate(new SlicePredicate().setColumn_names(columnsToDelete));
+                }
+                else
+                {
+                    SliceRange range = new SliceRange(new byte[]{ }, new byte[]{ }, false, Integer.MAX_VALUE);
+                    deletion.setPredicate(new SlicePredicate().setSlice_range(range));
+                }
+
+                mutation.setDeletion(deletion);
+                mutationList.add(mutation);
+            }
+        }
+    }
+
+    /**
+     * Close this <code>RecordWriter</code> to future operations, but not before
+     * flushing out the batched mutations.
+     *
+     * @param context the context of the task
+     * @throws IOException
+     */
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        flush();
+    }
+
+    /**
+     * Flush the mutations cache, iff more mutations have been cached than
+     * {@link #batchThreshold}.
+     *
+     * @throws IOException
+     */
+    private void maybeFlush() throws IOException
+    {
+        if (++batchSize > batchThreshold)
+        {
+            flush();
+            batchSize = 0L;
+        }
+    }
+
+    /**
+     * Send the batched mutations over to Cassandra, and then clear the
+     * mutations cache.
+     *
+     * @throws IOException
+     */
+    protected synchronized void flush() throws IOException
+    {
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        try
+        {
+            List<Future<?>> mutationFutures = new ArrayList<Future<?>>();
+            for (Map.Entry<InetAddress, Map<byte[], Map<String, List<Mutation>>>> entry : mutationsByEndpoint.entrySet())
+            {
+                mutationFutures.add(executor.submit(new EndpointCallable(context, entry.getKey(), entry.getValue())));
+            }
+            // wait until we have all the results back
+            for (Future<?> mutationFuture : mutationFutures)
+            {
+                try
+                {
+                    mutationFuture.get();
+                }
+                catch (ExecutionException e)
+                {
+                    throw new IOException("Could not perform endpoint mutations", e.getCause());
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
+        finally
+        {
+            executor.shutdownNow();
+            mutationsByEndpoint.clear();
+        }
+
+    }
+
+    /**
+     * The <code>EndpointCallable</code> facilitates an asynchronous call to a
+     * specific node in the ring that commands it to perform a batched set of
+     * mutations. Needless to say, the given mutations are targeted at rows that
+     * the selected endpoint is responsible for (i.e., is the primary replica
+     * for).
+     */
+    public class EndpointCallable implements Callable<Void>
+    {
+        // The task attempt context associated with this callable.
+        private TaskAttemptContext taskContext;
+        // The endpoint of the primary replica for the rows being mutated
+        private InetAddress endpoint;
+        // The mutations to be performed in the node referenced by {@link
+        // #endpoint}.
+        private Map<byte[], Map<String, List<Mutation>>> mutations;
+
+        /**
+         * Constructs an {@link EndpointCallable} for the given endpoint and set
+         * of mutations.
+         *
+         * @param endpoint  the endpoint wherein to execute the mutations
+         * @param mutations the mutation map expected by
+         *                  {@link Cassandra.Client#batch_mutate(Map, ConsistencyLevel)}
+         */
+        public EndpointCallable(TaskAttemptContext taskContext, InetAddress endpoint, Map<byte[], Map<String, List<Mutation>>> mutations)
+        {
+            this.taskContext = taskContext;
+            this.endpoint = endpoint;
+            this.mutations = mutations;
+        }
+
+        /**
+         * Perform the call to
+         * {@link Cassandra.Client#batch_mutate(Map, ConsistencyLevel)}.
+         */
+        public Void call() throws Exception
+        {
+            TSocket socket = null;
+            try
+            {
+                socket = new TSocket(endpoint.getHostName(), DatabaseDescriptor.getRpcPort());
+                Cassandra.Client client = ColumnFamilyOutputFormat.createAuthenticatedClient(socket, taskContext);
+                client.batch_mutate(mutations, ConsistencyLevel.ONE);
+                return null;
+            }
+            finally
+            {
+                if (socket != null)
+                    socket.close();
+            }
+        }
+    }
+
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnWritable.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,113 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The <code>ColumnWritable</code> is a {@link WritableComparable} that denotes
+ * a column name and value.
+ */
+public class ColumnWritable implements WritableComparable<ColumnWritable>
+{
+    // A comparator that checks if two byte arrays are the same or not.
+    public static final Comparator<byte[]> BYTE_ARRAY_COMPARATOR = new Comparator<byte[]>()
+    {
+        public int compare(byte[] o1, byte[] o2)
+        {
+            return FBUtilities.compareByteArrays(o1, o2);
+        }
+    };
+    
+    // The name and value of the column this writable denotes.
+    private byte[] name, value;
+    
+    public ColumnWritable(byte[] name, byte[] value)
+    {
+        setName(name);
+        setValue(value);
+    }
+    
+    public byte[] getValue()
+    {
+        return value;
+    }
+    
+    public void setValue(byte[] value)
+    {
+        this.value = value;
+    }
+    
+    public byte[] getName()
+    {
+        return name;
+    }
+    
+    public void setName(byte[] name)
+    {
+        this.name = name;
+    }
+    
+    public void readFields(DataInput in) throws IOException
+    {
+        name = FBUtilities.readByteArray(in);
+        value = FBUtilities.readByteArray(in);
+    }
+    
+    public void write(DataOutput out) throws IOException
+    {
+        FBUtilities.writeByteArray(name, out);
+        FBUtilities.writeByteArray(value, out);
+    }
+    
+    /** Returns true iff <code>o</code> is a ColumnWritable with the same value. */
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof ColumnWritable))
+            return false;
+        ColumnWritable that = (ColumnWritable) o;
+        return compareTo(that) == 0;
+    }
+    
+    public int hashCode()
+    {
+        return name.hashCode() + value.hashCode();
+    }
+    
+    /** Compares two ColumnWritables. */
+    public int compareTo(ColumnWritable o)
+    {
+        ColumnWritable that = (ColumnWritable) o;
+        int nameComparison = BYTE_ARRAY_COMPARATOR.compare(this.name, that.name);
+        if (nameComparison != 0)
+            return nameComparison;
+        return BYTE_ARRAY_COMPARATOR.compare(this.value, that.value);
+    }
+    
+    public String toString()
+    {
+        return "{ " + name.toString() + " : " + value.toString() + " }";
+    }
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/EmbeddedServer.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.service.CassandraDaemon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class EmbeddedServer extends CleanupHelper
+{
+    protected static CassandraDaemon daemon = null;
+    
+    enum GatewayService
+    {
+        Thrift, Avro
+    }
+    
+    public static GatewayService getDaemonGatewayService()
+    {
+        return GatewayService.Thrift;
+    }
+    
+    static ExecutorService executor = Executors.newSingleThreadExecutor();
+    
+    @BeforeClass
+    public static void startCassandra() throws IOException
+
+    {
+        executor.submit(new Runnable()
+        {
+            public void run()
+            {
+                switch (getDaemonGatewayService())
+                {
+                    case Avro:
+                        daemon = new org.apache.cassandra.avro.CassandraDaemon();
+                        break;
+                    case Thrift:
+                    default:
+                        daemon = new org.apache.cassandra.thrift.CassandraDaemon();
+                }
+                daemon.activate();
+            }
+        });
+        try
+        {
+            TimeUnit.SECONDS.sleep(3);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+    
+    @AfterClass
+    public static void stopCassandra() throws Exception
+    {
+        if (daemon != null)
+        {
+            daemon.deactivate();
+        }
+        executor.shutdown();
+        executor.shutdownNow();
+    }
+    
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/ColumnFamilyOutputFormatTest.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,222 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.EmbeddedServer;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A test case for the {@link ColumnFamilyOutputFormat}, which reads each
+ * &lt;key, value&gt; pair from a sequence file, maps them to a &lt;key,
+ * column&gt; pair, and then reduces it by aggregating the columns that
+ * correspond to the same key. Finally, the output &lt;key, columns&gt; pairs
+ * are written into the (Cassandra) column family associated with this
+ * {@link OutputFormat}.
+ * 
+ * @author Karthick Sankarachary
+ * 
+ */
+
+public class ColumnFamilyOutputFormatTest extends EmbeddedServer
+{
+    static final String KEYSPACE = "ColumnFamilyOutputFormatTestKeyspace";
+    static final String COLUMN_FAMILY = "outputColumnFamily";
+
+    private static final String INPUT_FOLDER = "columnfamily.outputtest";
+
+    private static final String INPUT_FILE = "rows.txt";
+
+    private static final int NUMBER_OF_ROWS = 3;
+    private static final int NUMBER_OF_COLUMNS = 4;
+
+    List<Integer> rowKeys;
+    List<Integer> columnValues;
+
+    private static FileSystem fs;
+
+    static
+    {
+        try
+        {
+            fs = FileSystem.getLocal(new Configuration());
+        }
+        catch (IOException ioe)
+        {
+            throw new IOError(ioe);
+        }
+    }
+
+    private Cassandra.Client thriftClient;
+
+    @Before
+    public void setup() throws Exception
+    {
+        setupCassandra();
+        defineRows();
+        createInputFile();
+    }
+
+    @After
+    public void tearDown() throws Exception
+    {
+        deleteInputFile();
+        deleteRows();
+    }
+
+    private void deleteRows() throws InvalidRequestException, TException
+    {
+        thriftClient.system_drop_column_family(COLUMN_FAMILY);
+    }
+
+    private void deleteInputFile() throws IOException
+    {
+        inputdir = new Path(INPUT_FOLDER);
+        if (!fs.delete(inputdir, true))
+            throw new IOException("Delete failed to remove " + inputdir.toString());
+    }
+
+    private void setupCassandra() throws TException, InvalidRequestException
+    {
+        /* Establish a thrift connection to the cassandra instance */
+        TSocket socket = new TSocket(DatabaseDescriptor.getListenAddress().getHostName(), DatabaseDescriptor.getRpcPort());
+        TTransport transport;
+        transport = socket;
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, false, false);
+        Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol);
+        transport.open();
+        thriftClient = cassandraClient;
+        Set<String> keyspaces = thriftClient.describe_keyspaces();
+        if (!keyspaces.contains(KEYSPACE))
+        {
+            List<CfDef> cfDefs = new ArrayList<CfDef>();
+            thriftClient.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.RackUnawareStrategy", 1, cfDefs));
+        }
+        thriftClient.set_keyspace(KEYSPACE);
+
+        CfDef cfDef = new CfDef(KEYSPACE, COLUMN_FAMILY);
+        try
+        {
+            thriftClient.system_add_column_family(cfDef);
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    private void defineRows()
+    {
+        rowKeys = new ArrayList<Integer>();
+        columnValues = new ArrayList<Integer>();
+        for (int key = 0; key < NUMBER_OF_ROWS; key++)
+        {
+            for (int columnValue = 0; columnValue < NUMBER_OF_COLUMNS; columnValue++)
+            {
+                rowKeys.add(key);
+                columnValues.add(columnValue);
+            }
+        }
+    }
+
+    Path inputdir;
+
+    private void createInputFile() throws IOException
+    {
+        inputdir = new Path(INPUT_FOLDER);
+        if (!fs.mkdirs(inputdir))
+        {
+            throw new IOException("Mkdirs failed to create " + inputdir.toString());
+        }
+
+        Path inputRows = new Path(inputdir, INPUT_FILE);
+        SequenceFile.Writer writer = SequenceFile.createWriter(fs, fs.getConf(), inputRows, IntWritable.class, IntWritable.class);
+        // OutputStream os = fs.create(inputRows);
+        // Writer writer = new OutputStreamWriter(os);
+        for (int keyValuePair = 0; keyValuePair < NUMBER_OF_ROWS * NUMBER_OF_COLUMNS; keyValuePair++)
+            writer.append(new IntWritable(rowKeys.get(keyValuePair)), new IntWritable(columnValues.get(keyValuePair)));
+        writer.close();
+    }
+
+    @Test
+    public void testCopyCF() throws Exception
+    {
+        ToolRunner.run(new Configuration(), new SampleColumnFamilyOutputTool(inputdir, COLUMN_FAMILY), new String[] {});
+        verifyOutput();
+
+    }
+
+    public void verifyOutput() throws Exception
+    {
+        List<KeySlice> keySlices = thriftClient.get_range_slices(new ColumnParent().setColumn_family(COLUMN_FAMILY),
+                                                                 new SlicePredicate().setColumn_names(new ArrayList<byte[]>()),
+                                                                 new KeyRange().setStart_key("".getBytes()).setEnd_key("".getBytes()),
+                                                                 ConsistencyLevel.ONE);
+        for (KeySlice keySlice : keySlices)
+        {
+            Integer key = Integer.parseInt(new String(keySlice.getKey()));
+            List<Integer> columnValues = new ArrayList<Integer>();
+            for (ColumnOrSuperColumn cosc : keySlice.getColumns())
+                columnValues.add(Integer.parseInt(new String(cosc.getColumn().getValue())));
+            verifyKeyValues(key, columnValues);
+        }
+    }
+
+    private void verifyKeyValues(Integer key, List<Integer> columnValues) throws Exception
+    {
+        List<Integer> outputColumnValues = new ArrayList<Integer>();
+        for (int i = 0; i < rowKeys.size(); i++)
+        {
+            if (rowKeys.get(i).equals(key))
+                outputColumnValues.add(this.columnValues.get(i));
+        }
+        columnValues.removeAll(outputColumnValues);
+        assert columnValues.isEmpty() : "Some of the input columns could not be found in the column family";
+    }
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,80 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.SortedMap;
+
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.thrift.TException;
+
+/**
+ * The <code>SampleColumnFamilyOutputTool</code> provides a tool interface which
+ * runs a {@link SampleColumnMapper} on the &lt;key, value&gt; pairs obtained
+ * from a sequence file, and then reduces it through the default
+ * {@link ColumnFamilyOutputReducer}.
+ * 
+ * @author Karthick Sankarachary
+ * 
+ */
+public class SampleColumnFamilyOutputTool extends Configured implements Tool
+{
+    private Path inputdir;
+    
+    public SampleColumnFamilyOutputTool(Path inputdir, String columnFamily)
+    {
+        this.inputdir = inputdir;
+    }
+    
+    public int run(String[] args)
+    throws InvalidRequestException, TException, IOException, InterruptedException, ClassNotFoundException
+    {
+        Job job = new Job(new Configuration());
+        
+        // In case your job runs out of memory, use this setting 
+        // (provided you're on Hadoop 0.20.1 or later)
+        // job.getConfiguration().setInt(JobContext.IO_SORT_MB, 1);
+        ConfigHelper.setColumnFamily(job.getConfiguration(),
+                                     ColumnFamilyOutputFormatTest.KEYSPACE,
+                                     ColumnFamilyOutputFormatTest.COLUMN_FAMILY);
+        ConfigHelper.setSlicePredicate(job.getConfiguration(), new SlicePredicate());
+
+        SequenceFileInputFormat.addInputPath(job, inputdir);
+        
+        job.setMapperClass(SampleColumnMapper.class);
+        job.setMapOutputKeyClass(IntWritable.class);
+        job.setMapOutputValueClass(ColumnWritable.class);
+        job.setInputFormatClass(SequenceFileInputFormat.class);
+        
+        job.setReducerClass(ColumnFamilyOutputReducer.class);
+        job.setOutputKeyClass(byte[].class);
+        job.setOutputValueClass(SortedMap.class);
+        job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
+        
+        job.waitForCompletion(true);
+        return 0;
+    }
+}
\ No newline at end of file

Added: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java?rev=955347&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnMapper.java Wed Jun 16 18:56:57 2010
@@ -0,0 +1,37 @@
+package org.apache.cassandra.hadoop;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * A sample mapper that takes a pair of input &lt;key, value&gt; (writable)
+ * integers, and writes them as &lt;key, column&gt; writables.
+ */
+public class SampleColumnMapper extends Mapper<IntWritable,IntWritable,IntWritable,ColumnWritable>
+{
+    protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException
+    {
+        byte[] columnNameAndValue = String.valueOf(value.get()).getBytes();
+        context.write(key, new ColumnWritable(columnNameAndValue, columnNameAndValue));
+    }
+}
\ No newline at end of file