You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/29 19:41:41 UTC

svn commit: r1208021 - in /cassandra/branches/cassandra-1.0: ./ src/java/org/apache/cassandra/hadoop/

Author: brandonwilliams
Date: Tue Nov 29 18:41:38 2011
New Revision: 1208021

URL: http://svn.apache.org/viewvc?rev=1208021&view=rev
Log:
Add old-style api support to CFIF and CFRR.
Patch by Steeve Morin, reviewed by brandonwilliams for CASSANDRA-2799

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Tue Nov 29 18:41:38 2011
@@ -1,7 +1,12 @@
-1.0.5
+1.0.6
  * add command to stop compactions (CASSANDRA-1740)
  * filter out unavailable cipher suites when using encryption (CASSANDRA-3178)
- * fix assertion error when forwarding to local nodes (CASSANDRA-3539)
+ * (HADOOP) add old-style api support for CFIF and CFRR (CASSANDRA-2799)
+
+1.0.5
+ * revert CASSANDRA-3407 (see CASSANDRA-3540)
+ * fix assertion error while forwarding writes to local nodes (CASSANDRA-3539)
+
 
 1.0.4
  * fix self-hinting of timed out read repair updates and make hinted handoff

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Nov 29 18:41:38 2011
@@ -44,11 +44,14 @@ import org.apache.cassandra.thrift.KeyRa
 import org.apache.cassandra.thrift.TokenRange;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,9 +74,18 @@ import org.slf4j.LoggerFactory;
  * The default split size is 64k rows.
  */
 public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+    implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
 
+    public static final String MAPRED_TASK_ID = "mapred.task.id";
+    // The simple fact that we need this is because the old Hadoop API wants us to "write"
+    // to the key and value whereas the new asks for it.
+    // I choose 8kb as the default max key size (instanciated only once), but you can
+    // override it in your jobConf with this setting.
+    public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
+    public static final int    CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
+
     private String keyspace;
     private String cfName;
 
@@ -262,10 +274,39 @@ public class ColumnFamilyInputFormat ext
         return map;
     }
 
-
-
     public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();
     }
+
+
+    //
+    // Old Hadoop API
+    //
+    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
+    {
+        TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID());
+        List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
+        org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
+        for (int i = 0; i < newInputSplits.size(); i++)
+            oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
+        return oldInputSplits;
+    }
+
+    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+    {
+        TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
+        {
+            @Override
+            public void progress()
+            {
+                reporter.progress();
+            }
+        };
+
+        ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT));
+        recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
+        return recordReader;
+    }
+
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Nov 29 18:41:38 2011
@@ -51,7 +51,10 @@ import org.apache.thrift.transport.TFram
 import org.apache.thrift.transport.TSocket;
 
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
 {
+    public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
+
     private ColumnFamilySplit split;
     private RowIterator iter;
     private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
@@ -64,6 +67,18 @@ public class ColumnFamilyRecordReader ex
     private TSocket socket;
     private Cassandra.Client client;
     private ConsistencyLevel consistencyLevel;
+    private int keyBufferSize = 8192;
+
+    public ColumnFamilyRecordReader()
+    {
+        this(ColumnFamilyRecordReader.CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT);
+    }
+
+    public ColumnFamilyRecordReader(int keyBufferSize)
+    {
+        super();
+        this.keyBufferSize = keyBufferSize;
+    }
 
     public void close() 
     {
@@ -387,4 +402,41 @@ public class ColumnFamilyRecordReader ex
             return sc;
         }
     }
+
+
+    // Because the old Hadoop API wants us to write to the key and value
+    // and the new asks for them, we need to copy the output of the new API
+    // to the old. Thus, expect a small performance hit.
+    // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
+    // and ColumnFamilyRecordReader don't support them, it should be fine for now.
+    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, IColumn> value) throws IOException
+    {
+        if (this.nextKeyValue())
+        {
+            key.clear();
+            key.put(this.getCurrentKey());
+            key.rewind();
+
+            value.clear();
+            value.putAll(this.getCurrentValue());
+
+            return true;
+        }
+        return false;
+    }
+
+    public ByteBuffer createKey()
+    {
+        return ByteBuffer.wrap(new byte[this.keyBufferSize]);
+    }
+
+    public SortedMap<ByteBuffer, IColumn> createValue()
+    {
+        return new TreeMap<ByteBuffer, IColumn>();
+    }
+
+    public long getPos() throws IOException
+    {
+        return (long)iter.rowsRead();
+    }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Tue Nov 29 18:41:38 2011
@@ -21,15 +21,15 @@ package org.apache.cassandra.hadoop;
  */
 
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-public class ColumnFamilySplit extends InputSplit implements Writable
+public class ColumnFamilySplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit
 {
     private String startToken;
     private String endToken;