You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/31 04:25:16 UTC

[04/10] git commit: ACCUMULO-1783 Update tests for new functionality and API changes

ACCUMULO-1783 Update tests for new functionality and API changes


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/e227ad4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/e227ad4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/e227ad4a

Branch: refs/heads/ACCUMULO-1783
Commit: e227ad4ab6d4277644e2aecefb1e2c8dc3db22cb
Parents: 58e5a7e
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:31:00 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:31:00 2013 -0700

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   |   2 +-
 .../apache/accumulo/pig/AccumuloStorage.java    |  40 ++-
 .../accumulo/pig/AccumuloStorageTest.java       | 297 +++++++++++++++++++
 3 files changed, 322 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 494fd72..2361dcf 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -208,7 +208,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
       AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
       AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
       if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+        LOG.debug("columns: " + columnFamilyColumnQualifierPairs);
         AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
       }
       

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 9c8f002..3759181 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalMap;
@@ -38,7 +39,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   
   protected final List<String> columnSpecs;
   
+  public AccumuloStorage() {
+    this("");
+  }
+  
   public AccumuloStorage(String columns) {
+    this.caster = new Utf8StorageConverter();
+    
     if (!StringUtils.isBlank(columns)) {
       String[] columnArray = StringUtils.split(columns, COMMA);
       columnSpecs = Lists.newArrayList(columnArray);
@@ -51,7 +58,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   protected Tuple getTuple(Key key, Value value) throws IOException {
     
     SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
-    List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
     
     List<Object> tupleEntries = Lists.newLinkedList();
     Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
@@ -103,16 +109,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return map;
   }
   
-  private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
-    Tuple tuple = TupleFactory.getInstance().newTuple(5);
-    tuple.set(0, new DataByteArray(colfam.getBytes()));
-    tuple.set(1, new DataByteArray(colqual.getBytes()));
-    tuple.set(2, new DataByteArray(colvis.getBytes()));
-    tuple.set(3, new Long(ts));
-    tuple.set(4, new DataByteArray(val.get()));
-    return tuple;
-  }
-  
   protected void configureInputFormat(Configuration conf) {
     AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
@@ -123,7 +119,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     
     Iterator<Object> tupleIter = tuple.iterator();
     
-    if (1 <= tuple.size()) {
+    if (1 >= tuple.size()) {
       log.debug("Ignoring tuple of size " + tuple.size());
       return Collections.emptyList();
     }
@@ -146,7 +142,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
       }
       
       // Grab the type for this field
-      byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+      byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
       
       // If we have a Map, we want to treat every Entry as a column in this record
       // placing said column in the column family unless this instance of AccumuloStorage
@@ -164,10 +160,22 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
           
           // If we have a CF, use it and push the Map's key down to the CQ
           if (null != cf) {
-            _cfHolder.set(cf);
-            _cqHolder.set(entry.getKey());
+            int index = cf.indexOf(COLON);
             
-            mutation.put(_cfHolder, _cqHolder, value);
+            // No colon in the provided column
+            if (-1 == index) {
+              _cfHolder.set(cf);
+              _cqHolder.set(entry.getKey());
+              
+              mutation.put(_cfHolder, _cqHolder, value);
+            } else {
+              _cfHolder.set(cf.getBytes(), 0, index);
+              
+              _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 1));
+              _cqHolder.append(entry.getKey().getBytes(), 0, entry.getKey().length());
+              
+              mutation.put(_cfHolder, _cqHolder, value);
+            }
           } else {
             // Just put the Map's key into the CQ
             _cqHolder.set(entry.getKey());

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
new file mode 100644
index 0000000..38d0260
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -0,0 +1,297 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class AccumuloStorageTest {
+  
+  @Test
+  public void test1Tuple() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    Tuple t = TupleFactory.getInstance().newTuple(1);
+    t.set(0, "row");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(0, mutations.size());
+  }
+  
+  @Test
+  public void test2TupleNoColumn() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage();
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, "value");
+    
+    Assert.assertEquals(0, storage.getMutations(t).size());
+  }
+  
+  @Test
+  public void test2TupleWithColumn() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, "value");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(1, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+  }
+  
+  @Test
+  public void test2TupleWithColumnQual() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col:qual");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, "value");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(1, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+  }
+  
+  @Test
+  public void test2TupleWithMixedColumns() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col1,col1:qual,col2:qual,col2");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(5);
+    t.set(0, "row");
+    t.set(1, "value1");
+    t.set(2, "value2");
+    t.set(3, "value3");
+    t.set(4, "value4");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(4, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+    
+    colUpdate = colUpdates.get(1);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value2".getBytes()));
+    
+    colUpdate = colUpdates.get(2);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
+    
+    colUpdate = colUpdates.get(3);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
+  }
+  
+  @Test
+  public void testIgnoredExtraColumns() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(3);
+    t.set(0, "row");
+    t.set(1, "value1");
+    t.set(2, "value2");
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(1, colUpdates.size());
+    
+    ColumnUpdate colUpdate = colUpdates.get(0);
+    Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+    Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+  }
+  
+  @Test
+  public void testNonIgnoredExtraAsMap() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Map<String,Object> map = Maps.newHashMap();
+    
+    map.put("mapcol1", "mapval1");
+    map.put("mapcol2", "mapval2");
+    map.put("mapcol3", "mapval3");
+    map.put("mapcol4", "mapval4");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(3);
+    t.set(0, "row");
+    t.set(1, "value1");
+    t.set(2, map);
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(5, colUpdates.size());
+    
+    Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+    expectations.put(Maps.immutableEntry("", "col"), "value1");
+    expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1");
+    expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2");
+    expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");
+    expectations.put(Maps.immutableEntry("", "mapcol4"), "mapval4");
+    
+    for (ColumnUpdate update : colUpdates) {
+      Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+      String value = new String(update.getValue());
+      Assert.assertTrue(expectations.containsKey(key));
+     
+      String actual = expectations.remove(key);
+      Assert.assertEquals(value, actual);
+    }
+    
+    Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+  }
+  
+  @Test
+  public void testMapWithColFam() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col");
+    
+    Map<String,Object> map = Maps.newHashMap();
+    
+    map.put("mapcol1", "mapval1");
+    map.put("mapcol2", "mapval2");
+    map.put("mapcol3", "mapval3");
+    map.put("mapcol4", "mapval4");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, map);
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(4, colUpdates.size());
+    
+    Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+    expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+    expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+    expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+    expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+    
+    for (ColumnUpdate update : colUpdates) {
+      Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+      String value = new String(update.getValue());
+      Assert.assertTrue(expectations.containsKey(key));
+     
+      String actual = expectations.remove(key);
+      Assert.assertEquals(value, actual);
+    }
+    
+    Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+  }
+  
+  @Test
+  public void testMapWithColFamColQualPrefix() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage("col:qual_");
+    
+    Map<String,Object> map = Maps.newHashMap();
+    
+    map.put("mapcol1", "mapval1");
+    map.put("mapcol2", "mapval2");
+    map.put("mapcol3", "mapval3");
+    map.put("mapcol4", "mapval4");
+    
+    Tuple t = TupleFactory.getInstance().newTuple(2);
+    t.set(0, "row");
+    t.set(1, map);
+    
+    Collection<Mutation> mutations = storage.getMutations(t);
+    
+    Assert.assertEquals(1, mutations.size());
+    
+    Mutation m = mutations.iterator().next();
+    
+    Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+    
+    List<ColumnUpdate> colUpdates = m.getUpdates();
+    Assert.assertEquals(4, colUpdates.size());
+    
+    Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol1"), "mapval1");
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol2"), "mapval2");
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol3"), "mapval3");
+    expectations.put(Maps.immutableEntry("col", "qual_mapcol4"), "mapval4");
+    
+    for (ColumnUpdate update : colUpdates) {
+      Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+      String value = new String(update.getValue());
+      Assert.assertTrue(expectations.containsKey(key));
+     
+      String actual = expectations.remove(key);
+      Assert.assertEquals(value, actual);
+    }
+    
+    Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+  }
+  
+}