You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/31 04:25:16 UTC
[04/10] git commit: ACCUMULO-1783 Update tests for new functionality
and API changes
ACCUMULO-1783 Update tests for new functionality and API changes
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/e227ad4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/e227ad4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/e227ad4a
Branch: refs/heads/ACCUMULO-1783
Commit: e227ad4ab6d4277644e2aecefb1e2c8dc3db22cb
Parents: 58e5a7e
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 24 19:31:00 2013 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 24 19:31:00 2013 -0700
----------------------------------------------------------------------
.../accumulo/pig/AbstractAccumuloStorage.java | 2 +-
.../apache/accumulo/pig/AccumuloStorage.java | 40 ++-
.../accumulo/pig/AccumuloStorageTest.java | 297 +++++++++++++++++++
3 files changed, 322 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 494fd72..2361dcf 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -208,7 +208,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF
AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
if (columnFamilyColumnQualifierPairs.size() > 0) {
- LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+ LOG.debug("columns: " + columnFamilyColumnQualifierPairs);
AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
}
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 9c8f002..3759181 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalMap;
@@ -38,7 +39,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
protected final List<String> columnSpecs;
+ public AccumuloStorage() {
+ this("");
+ }
+
public AccumuloStorage(String columns) {
+ this.caster = new Utf8StorageConverter();
+
if (!StringUtils.isBlank(columns)) {
String[] columnArray = StringUtils.split(columns, COMMA);
columnSpecs = Lists.newArrayList(columnArray);
@@ -51,7 +58,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
protected Tuple getTuple(Key key, Value value) throws IOException {
SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
- List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
List<Object> tupleEntries = Lists.newLinkedList();
Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
@@ -103,16 +109,6 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
return map;
}
- private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
- Tuple tuple = TupleFactory.getInstance().newTuple(5);
- tuple.set(0, new DataByteArray(colfam.getBytes()));
- tuple.set(1, new DataByteArray(colqual.getBytes()));
- tuple.set(2, new DataByteArray(colvis.getBytes()));
- tuple.set(3, new Long(ts));
- tuple.set(4, new DataByteArray(val.get()));
- return tuple;
- }
-
protected void configureInputFormat(Configuration conf) {
AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
}
@@ -123,7 +119,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
Iterator<Object> tupleIter = tuple.iterator();
- if (1 <= tuple.size()) {
+ if (1 >= tuple.size()) {
log.debug("Ignoring tuple of size " + tuple.size());
return Collections.emptyList();
}
@@ -146,7 +142,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
}
// Grab the type for this field
- byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+ byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]);
// If we have a Map, we want to treat every Entry as a column in this record
// placing said column in the column family unless this instance of AccumuloStorage
@@ -164,10 +160,22 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
// If we have a CF, use it and push the Map's key down to the CQ
if (null != cf) {
- _cfHolder.set(cf);
- _cqHolder.set(entry.getKey());
+ int index = cf.indexOf(COLON);
- mutation.put(_cfHolder, _cqHolder, value);
+ // No colon in the provided column
+ if (-1 == index) {
+ _cfHolder.set(cf);
+ _cqHolder.set(entry.getKey());
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ } else {
+ _cfHolder.set(cf.getBytes(), 0, index);
+
+ _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 1));
+ _cqHolder.append(entry.getKey().getBytes(), 0, entry.getKey().length());
+
+ mutation.put(_cfHolder, _cqHolder, value);
+ }
} else {
// Just put the Map's key into the CQ
_cqHolder.set(entry.getKey());
http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e227ad4a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
new file mode 100644
index 0000000..38d0260
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -0,0 +1,297 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class AccumuloStorageTest {
+
+ @Test
+ public void test1Tuple() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, "row");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(0, mutations.size());
+ }
+
+ @Test
+ public void test2TupleNoColumn() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage();
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Assert.assertEquals(0, storage.getMutations(t).size());
+ }
+
+ @Test
+ public void test2TupleWithColumn() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+ }
+
+ @Test
+ public void test2TupleWithColumnQual() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col:qual");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, "value");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+ }
+
+ @Test
+ public void test2TupleWithMixedColumns() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col1,col1:qual,col2:qual,col2");
+
+ Tuple t = TupleFactory.getInstance().newTuple(5);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, "value2");
+ t.set(3, "value3");
+ t.set(4, "value4");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+
+ colUpdate = colUpdates.get(1);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value2".getBytes()));
+
+ colUpdate = colUpdates.get(2);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "qual".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
+
+ colUpdate = colUpdates.get(3);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
+ }
+
+ @Test
+ public void testIgnoredExtraColumns() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, "value2");
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(1, colUpdates.size());
+
+ ColumnUpdate colUpdate = colUpdates.get(0);
+ Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
+ Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+ Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+ }
+
+ @Test
+ public void testNonIgnoredExtraAsMap() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Map<String,Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(3);
+ t.set(0, "row");
+ t.set(1, "value1");
+ t.set(2, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(5, colUpdates.size());
+
+ Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("", "col"), "value1");
+ expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("", "mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue(expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+ }
+
+ @Test
+ public void testMapWithColFam() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col");
+
+ Map<String,Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue(expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+ }
+
+ @Test
+ public void testMapWithColFamColQualPrefix() throws IOException {
+ AccumuloStorage storage = new AccumuloStorage("col:qual_");
+
+ Map<String,Object> map = Maps.newHashMap();
+
+ map.put("mapcol1", "mapval1");
+ map.put("mapcol2", "mapval2");
+ map.put("mapcol3", "mapval3");
+ map.put("mapcol4", "mapval4");
+
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, "row");
+ t.set(1, map);
+
+ Collection<Mutation> mutations = storage.getMutations(t);
+
+ Assert.assertEquals(1, mutations.size());
+
+ Mutation m = mutations.iterator().next();
+
+ Assert.assertTrue("Rows not equal", Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+ List<ColumnUpdate> colUpdates = m.getUpdates();
+ Assert.assertEquals(4, colUpdates.size());
+
+ Map<Entry<String,String>,String> expectations = Maps.newHashMap();
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol1"), "mapval1");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol2"), "mapval2");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol3"), "mapval3");
+ expectations.put(Maps.immutableEntry("col", "qual_mapcol4"), "mapval4");
+
+ for (ColumnUpdate update : colUpdates) {
+ Entry<String,String> key = Maps.immutableEntry(new String(update.getColumnFamily()), new String(update.getColumnQualifier()));
+ String value = new String(update.getValue());
+ Assert.assertTrue(expectations.containsKey(key));
+
+ String actual = expectations.remove(key);
+ Assert.assertEquals(value, actual);
+ }
+
+ Assert.assertTrue("Did not find all expectations", expectations.isEmpty());
+ }
+
+}