You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/11/10 22:08:01 UTC
svn commit: r1033713 - in /cassandra/branches/cassandra-0.7/contrib:
pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
word_count/src/WordCount.java
Author: jbellis
Date: Wed Nov 10 21:08:01 2010
New Revision: 1033713
URL: http://svn.apache.org/viewvc?rev=1033713&view=rev
Log:
update pig for ByteBuffers.
patch by tjake; reviewed by Jeremy Hanna for CASSANDRA-1725
Modified:
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1033713&r1=1033712&r2=1033713&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Nov 10 21:08:01 2010
@@ -68,18 +68,18 @@ public class CassandraStorage extends Lo
// load the next pair
if (!reader.nextKeyValue())
return null;
- byte[] key = (byte[])reader.getCurrentKey();
- SortedMap<byte[],IColumn> cf = (SortedMap<byte[],IColumn>)reader.getCurrentValue();
+
+ ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
+ SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
assert key != null && cf != null;
// and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
ArrayList<Tuple> columns = new ArrayList<Tuple>();
- tuple.set(0, new DataByteArray(key));
- for (Map.Entry<byte[], IColumn> entry : cf.entrySet())
+ tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+ for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
{
- byte[] name = entry.getKey();
- columns.add(columnToTuple(name, 0, name.length, entry.getValue()));
+ columns.add(columnToTuple(entry.getKey(), entry.getValue()));
}
tuple.set(1, new DefaultDataBag(columns));
@@ -91,25 +91,23 @@ public class CassandraStorage extends Lo
}
}
- private Tuple columnToTuple(byte[] name, int nameOffset, int nameLength, IColumn col) throws IOException
+ private Tuple columnToTuple(ByteBuffer name, IColumn col) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
- pair.set(0, new DataByteArray(name, nameOffset, nameLength));
+ pair.set(0, new DataByteArray(name.array(), name.position()+name.arrayOffset(), name.limit()+name.arrayOffset()));
if (col instanceof Column)
{
// standard
pair.set(1, new DataByteArray(col.value().array(),
col.value().position()+col.value().arrayOffset(),
- col.value().remaining()));
+ col.value().limit()+col.value().arrayOffset()));
return pair;
}
// super
ArrayList<Tuple> subcols = new ArrayList<Tuple>();
for (IColumn subcol : ((SuperColumn)col).getSubColumns())
- subcols.add(columnToTuple(subcol.name().array(),
- subcol.name().position()+subcol.name().arrayOffset(),
- subcol.name().remaining(), subcol));
+ subcols.add(columnToTuple(subcol.name(), subcol));
pair.set(1, new DefaultDataBag(subcols));
return pair;
Modified: cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java?rev=1033713&r1=1033712&r2=1033713&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java Wed Nov 10 21:08:01 2010
@@ -70,15 +70,15 @@ public class WordCount extends Configure
System.exit(0);
}
- public static class TokenizerMapper extends Mapper<byte[], SortedMap<byte[], IColumn>, Text, IntWritable>
+ public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
- private String columnName;
+ private ByteBuffer columnName;
- public void map(byte[] key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException
+ public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
{
- IColumn column = columns.get(columnName.getBytes());
+ IColumn column = columns.get(columnName);
if (column == null)
return;
String value = ByteBufferUtil.string(column.value());
@@ -95,7 +95,7 @@ public class WordCount extends Configure
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException
{
- this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
+ this.columnName = ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
}
}