You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/31 04:25:17 UTC
[05/10] git commit: ACCUMULO-1783 Add in some tests for deserializing
Key/Value back into Tuple, fixing some bugs along the way.
ACCUMULO-1783 Add in some tests for deserializing Key/Value back into
Tuple, fixing some bugs along the way.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/8c46d9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/8c46d9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/8c46d9b3
Branch: refs/heads/ACCUMULO-1783
Commit: 8c46d9b3262e3568dd613f80569d1f11944cf262
Parents: e227ad4
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:55:01 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:55:01 2013 -0700
----------------------------------------------------------------------
.../apache/accumulo/pig/AccumuloStorage.java | 11 +-
.../accumulo/pig/AccumuloStorageTest.java | 112 +++++++++++++++++++
2 files changed, 120 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 3759181..030b0c3 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -67,6 +67,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
while (iter.hasNext()) {
if (null == currentEntry) {
currentEntry = iter.next();
+ aggregate.add(currentEntry);
} else {
Entry<Key,Value> nextEntry = iter.next();
@@ -75,11 +76,14 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
// Aggregate this entry into the map
aggregate.add(nextEntry);
} else {
+ currentEntry = nextEntry;
+
// Flush and start again
InternalMap map = aggregate(aggregate);
tupleEntries.add(map);
aggregate = Lists.newLinkedList();
+ aggregate.add(currentEntry);
}
}
}
@@ -92,7 +96,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
tuple.set(0, new DataByteArray(key.getRow().getBytes()));
int i = 1;
- for (Object obj : tupleEntries) {
+ for (Object obj : tupleEntries) {
tuple.set(i, obj);
i++;
}
@@ -100,10 +104,11 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return tuple;
}
- private InternalMap aggregate(List<Entry<Key,Value>> columns) {
+ private InternalMap aggregate(List<Entry<Key,Value>> columns) {
InternalMap map = new InternalMap();
for (Entry<Key,Value> column : columns) {
- map.put(column.getKey().getColumnQualifier().toString(), new DataByteArray(column.getValue().get()));
+ map.put(column.getKey().getColumnFamily().toString() + COLON + column.getKey().getColumnQualifier().toString(),
+ new DataByteArray(column.getValue().get()));
}
return map;
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/8c46d9b3/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index 38d0260..10777a8 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -8,12 +8,18 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class AccumuloStorageTest {
@@ -294,4 +300,110 @@ public class AccumuloStorageTest {
Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
}
+ @Test
+ public void testSingleKey() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "", "col1"));
+ values.add(new Value("value1".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put(":col1", new DataByteArray("value1"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testSingleColumn() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(2, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+
+ Assert.assertEquals(map, t.get(1));
+ }
+
+ @Test
+ public void testMultipleColumns() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ List<Key> keys = Lists.newArrayList();
+ List<Value> values = Lists.newArrayList();
+
+ keys.add(new Key("1", "col1", "cq1"));
+ keys.add(new Key("1", "col1", "cq2"));
+ keys.add(new Key("1", "col1", "cq3"));
+ keys.add(new Key("1", "col2", "cq1"));
+ keys.add(new Key("1", "col3", "cq1"));
+ keys.add(new Key("1", "col3", "cq2"));
+
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+ values.add(new Value("value3".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value1".getBytes()));
+ values.add(new Value("value2".getBytes()));
+
+ Key k = new Key("1");
+ Value v = WholeRowIterator.encodeRow(keys, values);
+
+ Tuple t = storage.getTuple(k, v);
+
+ Assert.assertEquals(4, t.size());
+
+ Assert.assertEquals("1", t.get(0).toString());
+
+ InternalMap map = new InternalMap();
+ map.put("col1:cq1", new DataByteArray("value1"));
+ map.put("col1:cq2", new DataByteArray("value2"));
+ map.put("col1:cq3", new DataByteArray("value3"));
+
+ Assert.assertEquals(map, t.get(1));
+
+ map = new InternalMap();
+ map.put("col2:cq1", new DataByteArray("value1"));
+
+ Assert.assertEquals(map, t.get(2));
+
+ map = new InternalMap();
+ map.put("col3:cq1", new DataByteArray("value1"));
+ map.put("col3:cq2", new DataByteArray("value2"));
+
+ Assert.assertEquals(map, t.get(3));
+ }
+
}