You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/11/10 22:08:01 UTC

svn commit: r1033713 - in /cassandra/branches/cassandra-0.7/contrib: pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java word_count/src/WordCount.java

Author: jbellis
Date: Wed Nov 10 21:08:01 2010
New Revision: 1033713

URL: http://svn.apache.org/viewvc?rev=1033713&view=rev
Log:
update pig for ByteBuffers.
patch by tjake; reviewed by Jeremy Hanna for CASSANDRA-1725

Modified:
    cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java

Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1033713&r1=1033712&r2=1033713&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Nov 10 21:08:01 2010
@@ -68,18 +68,18 @@ public class CassandraStorage extends Lo
             // load the next pair
             if (!reader.nextKeyValue())
                 return null;
-            byte[] key = (byte[])reader.getCurrentKey();
-            SortedMap<byte[],IColumn> cf = (SortedMap<byte[],IColumn>)reader.getCurrentValue();
+
+            ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
+            SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
             assert key != null && cf != null;
             
             // and wrap it in a tuple
-		    Tuple tuple = TupleFactory.getInstance().newTuple(2);
+	    Tuple tuple = TupleFactory.getInstance().newTuple(2);
             ArrayList<Tuple> columns = new ArrayList<Tuple>();
-            tuple.set(0, new DataByteArray(key));
-            for (Map.Entry<byte[], IColumn> entry : cf.entrySet())
+            tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+            for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
             {                    
-                byte[] name = entry.getKey();
-                columns.add(columnToTuple(name, 0, name.length, entry.getValue()));
+                columns.add(columnToTuple(entry.getKey(), entry.getValue()));
             }
          
             tuple.set(1, new DefaultDataBag(columns));
@@ -91,25 +91,23 @@ public class CassandraStorage extends Lo
         }
     }
 
-    private Tuple columnToTuple(byte[] name, int nameOffset, int nameLength, IColumn col) throws IOException
+    private Tuple columnToTuple(ByteBuffer name, IColumn col) throws IOException
     {
         Tuple pair = TupleFactory.getInstance().newTuple(2);
-        pair.set(0, new DataByteArray(name, nameOffset, nameLength));
+        pair.set(0, new DataByteArray(name.array(), name.position()+name.arrayOffset(), name.limit()+name.arrayOffset()));
         if (col instanceof Column)
         {
             // standard
             pair.set(1, new DataByteArray(col.value().array(), 
                                           col.value().position()+col.value().arrayOffset(),
-                                          col.value().remaining()));
+                                          col.value().limit()+col.value().arrayOffset()));
             return pair;
         }
 
         // super
         ArrayList<Tuple> subcols = new ArrayList<Tuple>();
         for (IColumn subcol : ((SuperColumn)col).getSubColumns())
-            subcols.add(columnToTuple(subcol.name().array(), 
-                                      subcol.name().position()+subcol.name().arrayOffset(),
-                                      subcol.name().remaining(), subcol));
+            subcols.add(columnToTuple(subcol.name(), subcol));
         
         pair.set(1, new DefaultDataBag(subcols));
         return pair;

Modified: cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java?rev=1033713&r1=1033712&r2=1033713&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java Wed Nov 10 21:08:01 2010
@@ -70,15 +70,15 @@ public class WordCount extends Configure
         System.exit(0);
     }
 
-    public static class TokenizerMapper extends Mapper<byte[], SortedMap<byte[], IColumn>, Text, IntWritable>
+    public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
     {
         private final static IntWritable one = new IntWritable(1);
         private Text word = new Text();
-        private String columnName;
+        private ByteBuffer columnName;
 
-        public void map(byte[] key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException
+        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
         {
-            IColumn column = columns.get(columnName.getBytes());
+            IColumn column = columns.get(columnName);
             if (column == null)
                 return;
             String value = ByteBufferUtil.string(column.value());
@@ -95,7 +95,7 @@ public class WordCount extends Configure
         protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
             throws IOException, InterruptedException
         {
-            this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
+            this.columnName = ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
         }
         
     }