You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/31 04:25:17 UTC

[05/10] git commit: ACCUMULO-1783 Add in some tests for deserializing Key/Value back into Tuple, fixing some bugs along the way.

ACCUMULO-1783 Add in some tests for deserializing Key/Value back into
Tuple, fixing some bugs along the way.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/8c46d9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/8c46d9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/8c46d9b3

Branch: refs/heads/ACCUMULO-1783
Commit: 8c46d9b3262e3568dd613f80569d1f11944cf262
Parents: e227ad4
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:55:01 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:55:01 2013 -0700

----------------------------------------------------------------------
 .../apache/accumulo/pig/AccumuloStorage.java    |  11 +-
 .../accumulo/pig/AccumuloStorageTest.java       | 112 +++++++++++++++++++
 2 files changed, 120 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 3759181..030b0c3 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -67,6 +67,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     while (iter.hasNext()) {
       if (null == currentEntry) {
         currentEntry = iter.next();
+        aggregate.add(currentEntry);
       } else {
         Entry<Key,Value> nextEntry = iter.next();
         
@@ -75,11 +76,14 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
           // Aggregate this entry into the map
           aggregate.add(nextEntry);
         } else {
+          currentEntry = nextEntry;
+          
           // Flush and start again
           InternalMap map = aggregate(aggregate);
           tupleEntries.add(map);
           
           aggregate = Lists.newLinkedList();
+          aggregate.add(currentEntry);
         }
       }
     }
@@ -92,7 +96,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
     tuple.set(0, new DataByteArray(key.getRow().getBytes()));
     int i = 1;
-    for (Object obj : tupleEntries)  {
+    for (Object obj : tupleEntries) {
       tuple.set(i, obj);
       i++;
     }
@@ -100,10 +104,11 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return tuple;
   }
   
-  private InternalMap aggregate(List<Entry<Key,Value>> columns)  {
+  private InternalMap aggregate(List<Entry<Key,Value>> columns) {
     InternalMap map = new InternalMap();
     for (Entry<Key,Value> column : columns) {
-      map.put(column.getKey().getColumnQualifier().toString(), new DataByteArray(column.getValue().get()));
+      map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
+          new DataByteArray(column.getValue().get()));
     }
     
     return map;

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index 38d0260..10777a8 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -8,12 +8,18 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class AccumuloStorageTest {
@@ -294,4 +300,110 @@ public class AccumuloStorageTest {
     Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
   }
   
+  @Test
+  public void testSingleKey() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "", "col1"));
+    values.add(new Value("value1".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(2, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put(":col1", new DataByteArray("value1"));
+    
+    Assert.assertEquals(map, t.get(1));
+  }
+  
+  @Test
+  public void testSingleColumn() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "col1", "cq1"));
+    keys.add(new Key("1", "col1", "cq2"));
+    keys.add(new Key("1", "col1", "cq3"));
+    
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    values.add(new Value("value3".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(2, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put("col1:cq1", new DataByteArray("value1"));
+    map.put("col1:cq2", new DataByteArray("value2"));
+    map.put("col1:cq3", new DataByteArray("value3"));
+    
+    Assert.assertEquals(map, t.get(1));
+  }
+  
+  @Test
+  public void testMultipleColumns() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "col1", "cq1"));
+    keys.add(new Key("1", "col1", "cq2"));
+    keys.add(new Key("1", "col1", "cq3"));
+    keys.add(new Key("1", "col2", "cq1"));
+    keys.add(new Key("1", "col3", "cq1"));
+    keys.add(new Key("1", "col3", "cq2"));
+    
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    values.add(new Value("value3".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(4, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put("col1:cq1", new DataByteArray("value1"));
+    map.put("col1:cq2", new DataByteArray("value2"));
+    map.put("col1:cq3", new DataByteArray("value3"));
+    
+    Assert.assertEquals(map, t.get(1));
+    
+    map = new InternalMap();
+    map.put("col2:cq1", new DataByteArray("value1"));
+    
+    Assert.assertEquals(map, t.get(2));
+    
+    map = new InternalMap();
+    map.put("col3:cq1", new DataByteArray("value1"));
+    map.put("col3:cq2", new DataByteArray("value2"));
+    
+    Assert.assertEquals(map, t.get(3));
+  }
+  
 }