You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/29 19:41:41 UTC
svn commit: r1208021 - in /cassandra/branches/cassandra-1.0: ./
src/java/org/apache/cassandra/hadoop/
Author: brandonwilliams
Date: Tue Nov 29 18:41:38 2011
New Revision: 1208021
URL: http://svn.apache.org/viewvc?rev=1208021&view=rev
Log:
Add old-style api support to CFIF and CFRR.
Patch by Steeve Morin, reviewed by brandonwilliams for CASSANDRA-2799
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Tue Nov 29 18:41:38 2011
@@ -1,7 +1,12 @@
-1.0.5
+1.0.6
* add command to stop compactions (CASSANDRA-1740)
* filter out unavailable cipher suites when using encryption (CASSANDRA-3178)
- * fix assertion error when forwarding to local nodes (CASSANDRA-3539)
+ * (HADOOP) add old-style api support for CFIF and CFRR (CASSANDRA-2799)
+
+1.0.5
+ * revert CASSANDRA-3407 (see CASSANDRA-3540)
+ * fix assertion error while forwarding writes to local nodes (CASSANDRA-3539)
+
1.0.4
* fix self-hinting of timed out read repair updates and make hinted handoff
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Nov 29 18:41:38 2011
@@ -44,11 +44,14 @@ import org.apache.cassandra.thrift.KeyRa
import org.apache.cassandra.thrift.TokenRange;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,9 +74,18 @@ import org.slf4j.LoggerFactory;
* The default split size is 64k rows.
*/
public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+ implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+ public static final String MAPRED_TASK_ID = "mapred.task.id";
+ // The simple fact that we need this is because the old Hadoop API wants us to "write"
+ // to the key and value whereas the new asks for it.
+ // I choose 8kb as the default max key size (instanciated only once), but you can
+ // override it in your jobConf with this setting.
+ public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
+ public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
+
private String keyspace;
private String cfName;
@@ -262,10 +274,39 @@ public class ColumnFamilyInputFormat ext
return map;
}
-
-
public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
}
+
+
+ //
+ // Old Hadoop API
+ //
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
+ {
+ TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID());
+ List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
+ org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
+ for (int i = 0; i < newInputSplits.size(); i++)
+ oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
+ return oldInputSplits;
+ }
+
+ public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
+ {
+ TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
+ {
+ @Override
+ public void progress()
+ {
+ reporter.progress();
+ }
+ };
+
+ ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT));
+ recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
+ return recordReader;
+ }
+
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Nov 29 18:41:38 2011
@@ -51,7 +51,10 @@ import org.apache.thrift.transport.TFram
import org.apache.thrift.transport.TSocket;
public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
+ implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
{
+ public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
+
private ColumnFamilySplit split;
private RowIterator iter;
private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
@@ -64,6 +67,18 @@ public class ColumnFamilyRecordReader ex
private TSocket socket;
private Cassandra.Client client;
private ConsistencyLevel consistencyLevel;
+ private int keyBufferSize = 8192;
+
+ public ColumnFamilyRecordReader()
+ {
+ this(ColumnFamilyRecordReader.CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT);
+ }
+
+ public ColumnFamilyRecordReader(int keyBufferSize)
+ {
+ super();
+ this.keyBufferSize = keyBufferSize;
+ }
public void close()
{
@@ -387,4 +402,41 @@ public class ColumnFamilyRecordReader ex
return sc;
}
}
+
+
+ // Because the old Hadoop API wants us to write to the key and value
+ // and the new asks for them, we need to copy the output of the new API
+ // to the old. Thus, expect a small performance hit.
+ // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
+ // and ColumnFamilyRecordReader don't support them, it should be fine for now.
+ public boolean next(ByteBuffer key, SortedMap<ByteBuffer, IColumn> value) throws IOException
+ {
+ if (this.nextKeyValue())
+ {
+ key.clear();
+ key.put(this.getCurrentKey());
+ key.rewind();
+
+ value.clear();
+ value.putAll(this.getCurrentValue());
+
+ return true;
+ }
+ return false;
+ }
+
+ public ByteBuffer createKey()
+ {
+ return ByteBuffer.wrap(new byte[this.keyBufferSize]);
+ }
+
+ public SortedMap<ByteBuffer, IColumn> createValue()
+ {
+ return new TreeMap<ByteBuffer, IColumn>();
+ }
+
+ public long getPos() throws IOException
+ {
+ return (long)iter.rowsRead();
+ }
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=1208021&r1=1208020&r2=1208021&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Tue Nov 29 18:41:38 2011
@@ -21,15 +21,15 @@ package org.apache.cassandra.hadoop;
*/
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-public class ColumnFamilySplit extends InputSplit implements Writable
+public class ColumnFamilySplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit
{
private String startToken;
private String endToken;