You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/01/17 07:36:44 UTC

svn commit: r1559022 [2/2] - in /pig/trunk: ./ ivy/ src/org/apache/pig/backend/hadoop/accumulo/ test/ test/org/apache/pig/backend/hadoop/accumulo/

Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAbstractAccumuloStorage.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAbstractAccumuloStorage {
+
+    public static void assertConfigurationsEqual(Configuration expectedConf,
+            Configuration actualConf) {
+        // Make sure the values in both confs are equal
+        Iterator<Entry<String, String>> expectedIter = expectedConf.iterator();
+        while (expectedIter.hasNext()) {
+            Entry<String, String> e = expectedIter.next();
+            assertEquals("Values differed for " + e.getKey(),
+                    expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
+        }
+
+        Iterator<Entry<String, String>> actualIter = actualConf.iterator();
+        while (actualIter.hasNext()) {
+            Entry<String, String> e = actualIter.next();
+            assertEquals("Values differed for " + e.getKey(),
+                    expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
+        }
+    }
+
+    public static void assertKeyValueEqualsTuple(Key key, Value value,
+            Tuple tuple) throws ExecException {
+        assertTrue(Arrays.equals(key.getRow().getBytes(),
+                ((DataByteArray) tuple.get(0)).get()));
+        assertTrue(Arrays.equals(key.getColumnFamily().getBytes(),
+                ((DataByteArray) tuple.get(1)).get()));
+        assertTrue(Arrays.equals(key.getColumnQualifier().getBytes(),
+                ((DataByteArray) tuple.get(2)).get()));
+        assertTrue(Arrays.equals(key.getColumnVisibility().getBytes(),
+                ((DataByteArray) tuple.get(3)).get()));
+        assertEquals(key.getTimestamp(), ((Long) tuple.get(4)).longValue());
+        assertTrue(Arrays.equals(value.get(),
+                ((DataByteArray) tuple.get(5)).get()));
+    }
+
+    public static void assertWholeRowKeyValueEqualsTuple(Key key, Value value,
+            Tuple mainTuple) throws IOException {
+        assertTrue(Arrays.equals(key.getRow().getBytes(),
+                ((DataByteArray) mainTuple.get(0)).get()));
+
+        DefaultDataBag bag = (DefaultDataBag) mainTuple.get(1);
+        Iterator<Tuple> iter = bag.iterator();
+
+        for (Entry<Key, Value> e : WholeRowIterator.decodeRow(key, value)
+                .entrySet()) {
+            Tuple tuple = iter.next();
+
+            assertTrue(Arrays.equals(e.getKey().getColumnFamily().getBytes(),
+                    ((DataByteArray) tuple.get(0)).get()));
+            assertTrue(Arrays.equals(
+                    e.getKey().getColumnQualifier().getBytes(),
+                    ((DataByteArray) tuple.get(1)).get()));
+            assertTrue(Arrays.equals(e.getKey().getColumnVisibility()
+                    .getBytes(), ((DataByteArray) tuple.get(2)).get()));
+            assertEquals(e.getKey().getTimestamp(),
+                    ((Long) tuple.get(3)).longValue());
+            assertTrue(Arrays.equals(e.getValue().get(),
+                    ((DataByteArray) tuple.get(4)).get()));
+        }
+    }
+
+    public Job getExpectedLoadJob(String inst, String zookeepers, String user,
+            String password, String table, String start, String end,
+            Authorizations authorizations,
+            List<Pair<Text, Text>> columnFamilyColumnQualifierPairs)
+            throws IOException {
+        Collection<Range> ranges = new LinkedList<Range>();
+        ranges.add(new Range(start, end));
+
+        Job expected = new Job(new Configuration());
+
+        try {
+            AccumuloInputFormat.setConnectorInfo(expected, user,
+                    new PasswordToken(password));
+        } catch (AccumuloSecurityException e) {
+            Assert.fail(e.getMessage());
+        }
+        AccumuloInputFormat.setInputTableName(expected, table);
+        AccumuloInputFormat.setScanAuthorizations(expected, authorizations);
+        AccumuloInputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+        AccumuloInputFormat.fetchColumns(expected,
+                columnFamilyColumnQualifierPairs);
+        AccumuloInputFormat.setRanges(expected, ranges);
+
+        return expected;
+    }
+
+    public Job getDefaultExpectedLoadJob() throws IOException {
+        String inst = "myinstance";
+        String zookeepers = "127.0.0.1:2181";
+        String user = "root";
+        String password = "secret";
+        String table = "table1";
+        String start = "abc";
+        String end = "z";
+        Authorizations authorizations = new Authorizations(
+                "PRIVATE,PUBLIC".split(","));
+
+        List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text, Text>>();
+        columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(
+                "col1"), new Text("cq1")));
+        columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(
+                "col2"), new Text("cq2")));
+        columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(
+                "col3"), null));
+
+        Job expected = getExpectedLoadJob(inst, zookeepers, user, password,
+                table, start, end, authorizations,
+                columnFamilyColumnQualifierPairs);
+        return expected;
+    }
+
+    public Job getExpectedStoreJob(String inst, String zookeepers, String user,
+            String password, String table, long maxWriteBufferSize,
+            int writeThreads, int maxWriteLatencyMS) throws IOException {
+
+        Job expected = new Job(new Configuration());
+
+        try {
+            AccumuloOutputFormat.setConnectorInfo(expected, user,
+                    new PasswordToken(password));
+        } catch (AccumuloSecurityException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+        AccumuloOutputFormat.setCreateTables(expected, true);
+
+        BatchWriterConfig bwConfig = new BatchWriterConfig();
+        bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS);
+        bwConfig.setMaxMemory(maxWriteBufferSize);
+        bwConfig.setMaxWriteThreads(writeThreads);
+
+        AccumuloOutputFormat.setBatchWriterOptions(expected, bwConfig);
+
+        return expected;
+    }
+
+    public Job getDefaultExpectedStoreJob() throws IOException {
+        String inst = "myinstance";
+        String zookeepers = "127.0.0.1:2181";
+        String user = "root";
+        String password = "secret";
+        String table = "table1";
+        long maxWriteBufferSize = 1234000;
+        int writeThreads = 7;
+        int maxWriteLatencyMS = 30000;
+
+        Job expected = getExpectedStoreJob(inst, zookeepers, user, password,
+                table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+        return expected;
+    }
+
+    public String getDefaultLoadLocation() {
+        return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2,col3&start=abc&end=z";
+    }
+
+    public String getDefaultStoreLocation() {
+        return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&write_buffer_size_bytes=1234000&write_threads=7&write_latency_ms=30000";
+    }
+
+    public static AbstractAccumuloStorage getAbstractAccumuloStorage()
+            throws ParseException, IOException {
+        return getAbstractAccumuloStorage("");
+    }
+
+    public static AbstractAccumuloStorage getAbstractAccumuloStorage(
+            String columns) throws ParseException, IOException {
+        return getAbstractAccumuloStorage(columns, "");
+    }
+
+    public static AbstractAccumuloStorage getAbstractAccumuloStorage(
+            String columns, String args) throws ParseException, IOException {
+        return new AbstractAccumuloStorage(columns, args) {
+
+            @Override
+            public Collection<Mutation> getMutations(Tuple tuple) {
+                return null;
+            }
+
+            @Override
+            protected Tuple getTuple(Key key, Value value) throws IOException {
+                return null;
+            }
+        };
+    }
+
+    @Test
+    public void testSetLoadLocation() throws IOException, ParseException {
+        AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+
+        Job actual = new Job();
+        s.setLocation(getDefaultLoadLocation(), actual);
+        Configuration actualConf = actual.getConfiguration();
+
+        Job expected = getDefaultExpectedLoadJob();
+        Configuration expectedConf = expected.getConfiguration();
+
+        s.loadDependentJars(expectedConf);
+
+        assertConfigurationsEqual(expectedConf, actualConf);
+    }
+
+    @Test
+    public void testSetStoreLocation() throws IOException, ParseException {
+        AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+
+        Job actual = new Job();
+        s.setStoreLocation(getDefaultStoreLocation(), actual);
+        Configuration actualConf = actual.getConfiguration();
+
+        Job expected = getDefaultExpectedStoreJob();
+        Configuration expectedConf = expected.getConfiguration();
+
+        s.loadDependentJars(expectedConf);
+
+        assertConfigurationsEqual(expectedConf, actualConf);
+    }
+}

Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloBinaryConverter.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAccumuloBinaryConverter {
+
+    AccumuloBinaryConverter converter;
+
+    @Before
+    public void setup() {
+        converter = new AccumuloBinaryConverter();
+    }
+
+    @Test
+    public void testInts() throws IOException {
+        byte[] b = converter.toBytes(Integer.MAX_VALUE);
+        Assert.assertEquals(Integer.MAX_VALUE, converter.bytesToInteger(b)
+                .intValue());
+    }
+
+    @Test
+    public void testDoubles() throws IOException {
+        byte[] b = converter.toBytes(Double.MAX_VALUE);
+        Assert.assertEquals(Double.MAX_VALUE, converter.bytesToDouble(b)
+                .doubleValue(), 0);
+    }
+
+    @Test
+    public void testLongs() throws IOException {
+        byte[] b = converter.toBytes(Long.MAX_VALUE);
+        Assert.assertEquals(Long.MAX_VALUE, converter.bytesToLong(b)
+                .longValue());
+    }
+
+    @Test
+    public void testFloats() throws IOException {
+        byte[] b = converter.toBytes(Float.MAX_VALUE);
+        Assert.assertEquals(Float.MAX_VALUE, converter.bytesToFloat(b)
+                .floatValue(), 0f);
+    }
+
+    @Test
+    public void testBoolean() throws IOException {
+        Assert.assertTrue(converter.bytesToBoolean(converter.toBytes(true)));
+        Assert.assertFalse(converter.bytesToBoolean(converter.toBytes(false)));
+    }
+
+}

Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloColumns.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import org.apache.pig.backend.hadoop.accumulo.Column.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAccumuloColumns {
+
+    @Test(expected = NullPointerException.class)
+    public void testNull() {
+        new Column(null);
+    }
+
+    @Test
+    public void testEmptyColumn() {
+        Column c = new Column("");
+
+        Assert.assertEquals(Type.LITERAL, c.getType());
+        Assert.assertEquals("", c.getColumnFamily());
+        Assert.assertEquals(null, c.getColumnQualifier());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBlankColfamQual() {
+        new Column(":");
+    }
+
+    @Test
+    public void testColfamWithColqualRegex() {
+        Column c = new Column("cf:*");
+
+        Assert.assertEquals(Type.COLQUAL_PREFIX, c.getType());
+        Assert.assertEquals("cf", c.getColumnFamily());
+        Assert.assertEquals("", c.getColumnQualifier());
+    }
+
+    @Test
+    public void testColfamRegexEmptyColqual() {
+        Column c = new Column("cf:");
+
+        Assert.assertEquals(Type.COLQUAL_PREFIX, c.getType());
+        Assert.assertEquals("cf", c.getColumnFamily());
+        Assert.assertEquals("", c.getColumnQualifier());
+    }
+
+    @Test
+    public void testColfamRegex() {
+        Column c = new Column("cf*:");
+
+        Assert.assertEquals(Type.COLFAM_PREFIX, c.getType());
+        Assert.assertEquals("cf", c.getColumnFamily());
+        Assert.assertEquals("", c.getColumnQualifier());
+    }
+
+    @Test
+    public void testColfamRegexColqualRegex() {
+        // TODO Change test when cf*:cq* is supported
+        Column c = new Column("cf*:cq*");
+
+        Assert.assertEquals(Type.COLFAM_PREFIX, c.getType());
+        Assert.assertEquals("cf", c.getColumnFamily());
+        Assert.assertEquals("cq*", c.getColumnQualifier());
+    }
+}

Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class TestAccumuloPigCluster {
+
+    @SuppressWarnings("unchecked")
+    private static final List<ImmutableMap<String, String>> AIRPORTS = Lists
+            .newArrayList(
+                    ImmutableMap.of("code", "SJC", "name", "San Jose"),
+                    ImmutableMap.of("code", "SFO", "name", "San Francisco"),
+                    ImmutableMap.of("code", "MDO", "name", "Orlando"),
+                    ImmutableMap.of("code", "MDW", "name", "Chicago-Midway"),
+                    ImmutableMap.of("code", "JFK", "name", "JFK International"),
+                    ImmutableMap.of("code", "BWI", "name",
+                            "Baltimore-Washington"));
+
+    @SuppressWarnings("unchecked")
+    private static final List<ImmutableMap<String, String>> flightData = Lists
+            .newArrayList(
+                    ImmutableMap.of("origin", "BWI", "destination", "SFO"),
+                    ImmutableMap.of("origin", "BWI", "destination", "SJC"),
+                    ImmutableMap.of("origin", "MDW", "destination", "MDO"),
+                    ImmutableMap.of("origin", "MDO", "destination", "SJC"),
+                    ImmutableMap.of("origin", "SJC", "destination", "JFK"),
+                    ImmutableMap.of("origin", "JFK", "destination", "MDW"));
+
+    private static final Logger log = Logger
+            .getLogger(TestAccumuloPigCluster.class);
+    private static final File tmpdir = Files.createTempDir();
+    private static MiniAccumuloCluster accumuloCluster;
+    private static MiniCluster cluster;
+    private static Configuration conf;
+    private PigServer pig;
+
+    @BeforeClass
+    public static void setupClusters() throws Exception {
+        MiniAccumuloConfig macConf = new MiniAccumuloConfig(tmpdir, "password");
+        macConf.setNumTservers(1);
+
+        accumuloCluster = new MiniAccumuloCluster(macConf);
+        accumuloCluster.start();
+
+        // This is needed by Pig
+        cluster = MiniCluster.buildCluster();
+        conf = cluster.getConfiguration();
+    }
+
+    @Before
+    public void beforeTest() throws Exception {
+        pig = new PigServer(ExecType.LOCAL, conf);
+    }
+
+    @AfterClass
+    public static void stopClusters() throws Exception {
+        accumuloCluster.stop();
+        FileUtils.deleteDirectory(tmpdir);
+    }
+
+    private void loadTestData() throws Exception {
+        ZooKeeperInstance inst = new ZooKeeperInstance(
+                accumuloCluster.getInstanceName(),
+                accumuloCluster.getZooKeepers());
+        Connector c = inst.getConnector("root", new PasswordToken("password"));
+
+        TableOperations tops = c.tableOperations();
+        if (!tops.exists("airports")) {
+            tops.create("airports");
+        }
+
+        if (!tops.exists("flights")) {
+            tops.create("flights");
+        }
+
+        BatchWriterConfig config = new BatchWriterConfig();
+        config.setMaxWriteThreads(1);
+        config.setMaxLatency(100000l, TimeUnit.MILLISECONDS);
+        config.setMaxMemory(10000l);
+
+        BatchWriter bw = c.createBatchWriter("airports", config);
+        try {
+            int i = 1;
+            for (Map<String, String> record : AIRPORTS) {
+                Mutation m = new Mutation(Integer.toString(i));
+
+                for (Entry<String, String> entry : record.entrySet()) {
+                    m.put(entry.getKey(), "", entry.getValue());
+                }
+
+                bw.addMutation(m);
+                i++;
+            }
+        } finally {
+            if (null != bw) {
+                bw.close();
+            }
+        }
+
+        bw = c.createBatchWriter("flights", config);
+        try {
+            int i = 1;
+            for (Map<String, String> record : flightData) {
+                Mutation m = new Mutation(Integer.toString(i));
+
+                for (Entry<String, String> entry : record.entrySet()) {
+                    m.put(entry.getKey(), "", entry.getValue());
+                }
+
+                bw.addMutation(m);
+                i++;
+            }
+        } finally {
+            if (null != bw) {
+                bw.close();
+            }
+        }
+    }
+
+    @Test
+    // (timeout = 60000)
+    public void test() throws Exception {
+        loadTestData();
+
+        final String loadFlights = "flights = LOAD 'accumulo://flights?instance="
+                + accumuloCluster.getInstanceName()
+                + "&user=root&password=password&zookeepers="
+                + accumuloCluster.getZooKeepers()
+                + "' using org.apache.pig.backend.hadoop.accumulo.AccumuloStorage()"
+                + " as (rowKey:chararray, column_map:map[]);";
+
+        final String loadAirports = "airports = LOAD 'accumulo://airports?instance="
+                + accumuloCluster.getInstanceName()
+                + "&user=root&password=password&zookeepers="
+                + accumuloCluster.getZooKeepers()
+                + "' using org.apache.pig.backend.hadoop.accumulo.AccumuloStorage()"
+                + " as (rowKey:chararray, column_map:map[]);";
+
+        final String joinQuery = "joined = JOIN flights BY column_map#'origin', airports BY column_map#'code';";
+
+        pig.registerQuery(loadFlights);
+        pig.registerQuery(loadAirports);
+        pig.registerQuery(joinQuery);
+
+        Iterator<Tuple> it = pig.openIterator("joined");
+
+        int i = 0;
+        while (it.hasNext()) {
+            Tuple t = it.next();
+
+            // id and map for each dataset we joined
+            Assert.assertEquals(4, t.size());
+
+            Object o = t.get(1);
+
+            Map<String, String> airport = null, flight = null;
+
+            Assert.assertTrue(Map.class.isAssignableFrom(o.getClass()));
+            @SuppressWarnings("unchecked")
+            Map<String, String> data1 = (Map<String, String>) o;
+            Assert.assertTrue(!data1.isEmpty());
+
+            if (data1.containsKey("origin")) {
+                flight = data1;
+            } else if (data1.containsKey("code")) {
+                airport = data1;
+            } else {
+                Assert.fail("Received map which did not contain an expected key");
+            }
+
+            o = t.get(3);
+
+            Assert.assertTrue(Map.class.isAssignableFrom(o.getClass()));
+            @SuppressWarnings("unchecked")
+            Map<String, String> data2 = (Map<String, String>) o;
+            Assert.assertTrue(!data2.isEmpty());
+
+            if (null == flight && data2.containsKey("origin")) {
+                flight = data2;
+            } else if (null == airport && data2.containsKey("code")) {
+                airport = data2;
+            } else {
+                Assert.fail("Received map which did not contain an expected key");
+            }
+
+            Assert.assertTrue(null != airport && null != flight);
+
+            Assert.assertEquals(airport.get("code"), flight.get("origin"));
+
+            i++;
+        }
+
+        Assert.assertEquals(6, i);
+    }
+
+}

Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorage.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,840 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.commons.cli.ParseException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalMap;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestAccumuloStorage {
+
+    @Test
+    public void testWrite1Tuple() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage();
+
+        Tuple t = TupleFactory.getInstance().newTuple(1);
+        t.set(0, "row");
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(0, mutations.size());
+    }
+
+    @Test(expected = ClassCastException.class)
+    public void testWriteLiteralAsMap() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage();
+
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, "row");
+        t.set(1, "value");
+
+        storage.getMutations(t).size();
+    }
+
+    @Test(expected = ClassCastException.class)
+    public void testWriteLiteralAsMapWithAsterisk() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("*");
+
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, "row");
+        t.set(1, "value");
+
+        storage.getMutations(t).size();
+    }
+
+    @Test
+    public void testWrite2TupleWithColumn() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col");
+
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, "row");
+        t.set(1, "value");
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(1, colUpdates.size());
+
+        ColumnUpdate colUpdate = colUpdates.get(0);
+        Assert.assertTrue("CF not equal",
+                Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+        Assert.assertTrue("CQ not equal",
+                Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+        Assert.assertTrue("Values not equal",
+                Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+    }
+
+    @Test
+    public void testWrite2TupleWithColumnQual() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col:qual");
+
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, "row");
+        t.set(1, "value");
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(1, colUpdates.size());
+
+        ColumnUpdate colUpdate = colUpdates.get(0);
+        Assert.assertTrue("CF not equal",
+                Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+        Assert.assertTrue("CQ not equal", Arrays.equals(
+                colUpdate.getColumnQualifier(), "qual".getBytes()));
+        Assert.assertTrue("Values not equal",
+                Arrays.equals(colUpdate.getValue(), "value".getBytes()));
+    }
+
+    @Test
+    public void testWrite2TupleWithMixedColumns() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage(
+                "col1,col1:qual,col2:qual,col2");
+
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, "row");
+        t.set(1, "value1");
+        t.set(2, "value2");
+        t.set(3, "value3");
+        t.set(4, "value4");
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(4, colUpdates.size());
+
+        ColumnUpdate colUpdate = colUpdates.get(0);
+        Assert.assertTrue("CF not equal",
+                Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+        Assert.assertTrue("CQ not equal",
+                Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+        Assert.assertTrue("Values not equal",
+                Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+
+        colUpdate = colUpdates.get(1);
+        Assert.assertTrue("CF not equal",
+                Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+        Assert.assertTrue("CQ not equal", Arrays.equals(
+                colUpdate.getColumnQualifier(), "qual".getBytes()));
+        Assert.assertTrue("Values not equal",
+                Arrays.equals(colUpdate.getValue(), "value2".getBytes()));
+
+        colUpdate = colUpdates.get(2);
+        Assert.assertTrue("CF not equal",
+                Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+        Assert.assertTrue("CQ not equal", Arrays.equals(
+                colUpdate.getColumnQualifier(), "qual".getBytes()));
+        Assert.assertTrue("Values not equal",
+                Arrays.equals(colUpdate.getValue(), "value3".getBytes()));
+
+        colUpdate = colUpdates.get(3);
+        Assert.assertTrue("CF not equal",
+                Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+        Assert.assertTrue("CQ not equal",
+                Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+        Assert.assertTrue("Values not equal",
+                Arrays.equals(colUpdate.getValue(), "value4".getBytes()));
+    }
+
+    @Test
+    public void testWriteIgnoredExtraColumns() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col");
+
+        Tuple t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, "row");
+        t.set(1, "value1");
+        t.set(2, "value2");
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(1, colUpdates.size());
+
+        ColumnUpdate colUpdate = colUpdates.get(0);
+        Assert.assertTrue("CF not equal",
+                Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+        Assert.assertTrue("CQ not equal",
+                Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
+        Assert.assertTrue("Values not equal",
+                Arrays.equals(colUpdate.getValue(), "value1".getBytes()));
+    }
+
+    @Test
+    public void testWriteIgnoredExtraMap() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col1");
+
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put("mapcol1", "mapval1");
+        map.put("mapcol2", "mapval2");
+        map.put("mapcol3", "mapval3");
+        map.put("mapcol4", "mapval4");
+
+        Tuple t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, "row");
+        t.set(1, "value1");
+        t.set(2, map);
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(1, colUpdates.size());
+
+        ColumnUpdate update = colUpdates.get(0);
+        Assert.assertEquals("col1", new String(update.getColumnFamily()));
+        Assert.assertEquals("", new String(update.getColumnQualifier()));
+        Assert.assertEquals("value1", new String(update.getValue()));
+    }
+
+    @Test
+    public void testWriteMultipleColumnsWithNonExpandedMap()
+            throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col1,col2");
+
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put("mapcol1", "mapval1");
+        map.put("mapcol2", "mapval2");
+        map.put("mapcol3", "mapval3");
+        map.put("mapcol4", "mapval4");
+
+        Tuple t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, "row");
+        t.set(1, "value1");
+        t.set(2, map);
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(2, colUpdates.size());
+
+        ColumnUpdate update = colUpdates.get(0);
+        Assert.assertEquals("col1", new String(update.getColumnFamily()));
+        Assert.assertEquals("", new String(update.getColumnQualifier()));
+        Assert.assertEquals("value1", new String(update.getValue()));
+
+        update = colUpdates.get(1);
+        Assert.assertEquals("col2", new String(update.getColumnFamily()));
+        Assert.assertEquals("", new String(update.getColumnQualifier()));
+        Assert.assertArrayEquals(storage.objToBytes(map, DataType.MAP),
+                update.getValue());
+    }
+
+    @Test
+    public void testWriteMultipleColumnsWithExpandedMap() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col1,col2:");
+
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put("mapcol1", "mapval1");
+        map.put("mapcol2", "mapval2");
+        map.put("mapcol3", "mapval3");
+        map.put("mapcol4", "mapval4");
+
+        Tuple t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, "row");
+        t.set(1, "value1");
+        t.set(2, map);
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(5, colUpdates.size());
+
+        ColumnUpdate update = colUpdates.get(0);
+        Assert.assertEquals("col1", new String(update.getColumnFamily()));
+        Assert.assertEquals("", new String(update.getColumnQualifier()));
+        Assert.assertEquals("value1", new String(update.getValue()));
+
+        Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+        expectations.put(Maps.immutableEntry("col2", "mapcol1"), "mapval1");
+        expectations.put(Maps.immutableEntry("col2", "mapcol2"), "mapval2");
+        expectations.put(Maps.immutableEntry("col2", "mapcol3"), "mapval3");
+        expectations.put(Maps.immutableEntry("col2", "mapcol4"), "mapval4");
+
+        for (int i = 1; i < 5; i++) {
+            update = colUpdates.get(i);
+            Entry<String, String> key = Maps.immutableEntry(
+                    new String(update.getColumnFamily()),
+                    new String(update.getColumnQualifier()));
+            String value = new String(update.getValue());
+            Assert.assertTrue("Did not find expected key: " + key,
+                    expectations.containsKey(key));
+
+            String actual = expectations.remove(key);
+            Assert.assertEquals(value, actual);
+        }
+
+        Assert.assertTrue("Did not find all expectations",
+                expectations.isEmpty());
+    }
+
+    @Test
+    public void testWriteMapWithColFamWithColon() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col:");
+
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put("mapcol1", "mapval1");
+        map.put("mapcol2", "mapval2");
+        map.put("mapcol3", "mapval3");
+        map.put("mapcol4", "mapval4");
+
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, "row");
+        t.set(1, map);
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(4, colUpdates.size());
+
+        Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+        expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+        expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+        expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+        expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+
+        for (ColumnUpdate update : colUpdates) {
+            Entry<String, String> key = Maps.immutableEntry(
+                    new String(update.getColumnFamily()),
+                    new String(update.getColumnQualifier()));
+            String value = new String(update.getValue());
+            Assert.assertTrue("Did not find expected key: " + key,
+                    expectations.containsKey(key));
+
+            String actual = expectations.remove(key);
+            Assert.assertEquals(value, actual);
+        }
+
+        Assert.assertTrue("Did not find all expectations",
+                expectations.isEmpty());
+    }
+
+    @Test
+    public void testWriteMapWithColFamWithColonAsterisk() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col:*");
+
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put("mapcol1", "mapval1");
+        map.put("mapcol2", "mapval2");
+        map.put("mapcol3", "mapval3");
+        map.put("mapcol4", "mapval4");
+
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, "row");
+        t.set(1, map);
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(4, colUpdates.size());
+
+        Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+        expectations.put(Maps.immutableEntry("col", "mapcol1"), "mapval1");
+        expectations.put(Maps.immutableEntry("col", "mapcol2"), "mapval2");
+        expectations.put(Maps.immutableEntry("col", "mapcol3"), "mapval3");
+        expectations.put(Maps.immutableEntry("col", "mapcol4"), "mapval4");
+
+        for (ColumnUpdate update : colUpdates) {
+            Entry<String, String> key = Maps.immutableEntry(
+                    new String(update.getColumnFamily()),
+                    new String(update.getColumnQualifier()));
+            String value = new String(update.getValue());
+            Assert.assertTrue("Did not find expected key: " + key,
+                    expectations.containsKey(key));
+
+            String actual = expectations.remove(key);
+            Assert.assertEquals(value, actual);
+        }
+
+        Assert.assertTrue("Did not find all expectations",
+                expectations.isEmpty());
+    }
+
+    @Test
+    public void testWriteMapWithColFamColQualPrefix() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col:qual_*");
+
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put("mapcol1", "mapval1");
+        map.put("mapcol2", "mapval2");
+        map.put("mapcol3", "mapval3");
+        map.put("mapcol4", "mapval4");
+
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, "row");
+        t.set(1, map);
+
+        Collection<Mutation> mutations = storage.getMutations(t);
+
+        Assert.assertEquals(1, mutations.size());
+
+        Mutation m = mutations.iterator().next();
+
+        Assert.assertTrue("Rows not equal",
+                Arrays.equals(m.getRow(), ((String) t.get(0)).getBytes()));
+
+        List<ColumnUpdate> colUpdates = m.getUpdates();
+        Assert.assertEquals(4, colUpdates.size());
+
+        Map<Entry<String, String>, String> expectations = Maps.newHashMap();
+        expectations.put(Maps.immutableEntry("col", "qual_mapcol1"), "mapval1");
+        expectations.put(Maps.immutableEntry("col", "qual_mapcol2"), "mapval2");
+        expectations.put(Maps.immutableEntry("col", "qual_mapcol3"), "mapval3");
+        expectations.put(Maps.immutableEntry("col", "qual_mapcol4"), "mapval4");
+
+        for (ColumnUpdate update : colUpdates) {
+            Entry<String, String> key = Maps.immutableEntry(
+                    new String(update.getColumnFamily()),
+                    new String(update.getColumnQualifier()));
+            String value = new String(update.getValue());
+            Assert.assertTrue(expectations.containsKey(key));
+
+            String actual = expectations.remove(key);
+            Assert.assertEquals(value, actual);
+        }
+
+        Assert.assertTrue("Did not find all expectations",
+                expectations.isEmpty());
+    }
+
+    @Test
+    public void testReadSingleKey() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage();
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "", "col1"));
+        values.add(new Value("value1".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(2, t.size());
+
+        Assert.assertEquals("1", t.get(0).toString());
+
+        InternalMap map = new InternalMap();
+        map.put(":col1", new DataByteArray("value1"));
+
+        Assert.assertEquals(map, t.get(1));
+    }
+
+    @Test
+    public void testReadSingleColumn() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage();
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "col1", "cq1"));
+        keys.add(new Key("1", "col1", "cq2"));
+        keys.add(new Key("1", "col1", "cq3"));
+
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+        values.add(new Value("value3".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(2, t.size());
+
+        Assert.assertEquals("1", t.get(0).toString());
+
+        InternalMap map = new InternalMap();
+        map.put("col1:cq1", new DataByteArray("value1"));
+        map.put("col1:cq2", new DataByteArray("value2"));
+        map.put("col1:cq3", new DataByteArray("value3"));
+
+        Assert.assertEquals(map, t.get(1));
+    }
+
+    @Test
+    public void testReadMultipleColumnsAggregateColfamsAsterisk()
+            throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage("*");
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "col1", "cq1"));
+        keys.add(new Key("1", "col1", "cq2"));
+        keys.add(new Key("1", "col1", "cq3"));
+        keys.add(new Key("1", "col2", "cq1"));
+        keys.add(new Key("1", "col3", "cq1"));
+        keys.add(new Key("1", "col3", "cq2"));
+
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+        values.add(new Value("value3".getBytes()));
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(2, t.size());
+
+        Assert.assertEquals("1", t.get(0).toString());
+
+        InternalMap map = new InternalMap();
+        map.put("col1:cq1", new DataByteArray("value1"));
+        map.put("col1:cq2", new DataByteArray("value2"));
+        map.put("col1:cq3", new DataByteArray("value3"));
+        map.put("col2:cq1", new DataByteArray("value1"));
+        map.put("col3:cq1", new DataByteArray("value1"));
+        map.put("col3:cq2", new DataByteArray("value2"));
+
+        Assert.assertEquals(map, t.get(1));
+    }
+
+    @Test
+    public void testReadMultipleColumnsAggregateColfamsAsteriskEmptyColfam()
+            throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage("*");
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "col1", ""));
+        keys.add(new Key("1", "col2", ""));
+        keys.add(new Key("1", "col3", ""));
+
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+        values.add(new Value("value3".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(2, t.size());
+
+        Assert.assertEquals("1", t.get(0).toString());
+
+        InternalMap map = new InternalMap();
+        map.put("col1", new DataByteArray("value1"));
+        map.put("col2", new DataByteArray("value2"));
+        map.put("col3", new DataByteArray("value3"));
+
+        Assert.assertEquals(map, t.get(1));
+    }
+
+    @Test
+    public void testReadMultipleColumnsEmptyString() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("");
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "col1", "cq1"));
+        keys.add(new Key("1", "col1", "cq2"));
+        keys.add(new Key("1", "col1", "cq3"));
+        keys.add(new Key("1", "col2", "cq1"));
+        keys.add(new Key("1", "col3", "cq1"));
+        keys.add(new Key("1", "col3", "cq2"));
+
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+        values.add(new Value("value3".getBytes()));
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(2, t.size());
+
+        Assert.assertEquals("1", t.get(0).toString());
+
+        InternalMap map = new InternalMap();
+        map.put("col1:cq1", new DataByteArray("value1"));
+        map.put("col1:cq2", new DataByteArray("value2"));
+        map.put("col1:cq3", new DataByteArray("value3"));
+        map.put("col2:cq1", new DataByteArray("value1"));
+        map.put("col3:cq1", new DataByteArray("value1"));
+        map.put("col3:cq2", new DataByteArray("value2"));
+
+        Assert.assertEquals(map, t.get(1));
+    }
+
+    @Test
+    public void testReadMultipleColumnsNoColfamAggregate() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage();
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "col1", "cq1"));
+        keys.add(new Key("1", "col1", "cq2"));
+        keys.add(new Key("1", "col1", "cq3"));
+        keys.add(new Key("1", "col2", "cq1"));
+        keys.add(new Key("1", "col3", "cq1"));
+        keys.add(new Key("1", "col3", "cq2"));
+
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+        values.add(new Value("value3".getBytes()));
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value1".getBytes()));
+        values.add(new Value("value2".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(2, t.size());
+
+        Assert.assertEquals("1", t.get(0).toString());
+
+        InternalMap map = new InternalMap();
+        map.put("col1:cq1", new DataByteArray("value1"));
+        map.put("col1:cq2", new DataByteArray("value2"));
+        map.put("col1:cq3", new DataByteArray("value3"));
+        map.put("col2:cq1", new DataByteArray("value1"));
+        map.put("col3:cq1", new DataByteArray("value1"));
+        map.put("col3:cq2", new DataByteArray("value2"));
+
+        Assert.assertEquals(map, t.get(1));
+    }
+
+    @Test
+    public void testReadMultipleScalars() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage("col1,col3,col5");
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        // Filtering by AccumuloInputFormat isn't applied here since we're
+        // shortcircuiting things
+        keys.add(new Key("1", "col1", ""));
+        // keys.add(new Key("1", "col2", ""));
+        keys.add(new Key("1", "col3", ""));
+        // keys.add(new Key("1", "col4", ""));
+        keys.add(new Key("1", "col5", ""));
+
+        values.add(new Value("value1".getBytes()));
+        // values.add(new Value("value2".getBytes()));
+        values.add(new Value("value3".getBytes()));
+        // values.add(new Value("value4".getBytes()));
+        values.add(new Value("value5".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(4, t.size());
+
+        Assert.assertEquals("1", t.get(0).toString());
+
+        Assert.assertEquals("value1", t.get(1).toString());
+        Assert.assertEquals("value3", t.get(2).toString());
+        Assert.assertEquals("value5", t.get(3).toString());
+    }
+
+    @Test
+    public void testUnsortedColumnList() throws IOException, ParseException {
+        AccumuloStorage storage = new AccumuloStorage("z,a");
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "a", ""));
+        keys.add(new Key("1", "z", ""));
+
+        values.add(new Value("a".getBytes()));
+        values.add(new Value("z".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(3, t.size());
+
+        Assert.assertEquals("z", t.get(1).toString());
+        Assert.assertEquals("a", t.get(2).toString());
+    }
+
+    @Test
+    public void testReadMultipleScalarsAndMaps() throws IOException,
+            ParseException {
+        AccumuloStorage storage = new AccumuloStorage("z,r:,m:2,b:");
+
+        List<Key> keys = Lists.newArrayList();
+        List<Value> values = Lists.newArrayList();
+
+        keys.add(new Key("1", "a", "1"));
+        keys.add(new Key("1", "b", "1"));
+        keys.add(new Key("1", "b", "2"));
+        keys.add(new Key("1", "f", "1"));
+        keys.add(new Key("1", "f", "2"));
+        keys.add(new Key("1", "m", "1"));
+        keys.add(new Key("1", "m", "2"));
+        keys.add(new Key("1", "r", "1"));
+        keys.add(new Key("1", "r", "2"));
+        keys.add(new Key("1", "r", "3"));
+        keys.add(new Key("1", "z", ""));
+
+        values.add(new Value("a1".getBytes()));
+        values.add(new Value("b1".getBytes()));
+        values.add(new Value("b2".getBytes()));
+        values.add(new Value("f1".getBytes()));
+        values.add(new Value("f2".getBytes()));
+        values.add(new Value("m1".getBytes()));
+        values.add(new Value("m2".getBytes()));
+        values.add(new Value("r1".getBytes()));
+        values.add(new Value("r2".getBytes()));
+        values.add(new Value("r3".getBytes()));
+        values.add(new Value("z1".getBytes()));
+
+        Key k = new Key("1");
+        Value v = WholeRowIterator.encodeRow(keys, values);
+
+        Tuple t = storage.getTuple(k, v);
+
+        Assert.assertEquals(5, t.size());
+
+        Assert.assertEquals(new DataByteArray("z1".getBytes()), t.get(1));
+
+        HashMap<String, DataByteArray> rMap = new HashMap<String, DataByteArray>();
+        rMap.put("r:1", new DataByteArray("r1".getBytes()));
+        rMap.put("r:2", new DataByteArray("r2".getBytes()));
+        rMap.put("r:3", new DataByteArray("r3".getBytes()));
+        Assert.assertEquals(rMap, t.get(2));
+
+        Assert.assertEquals(new DataByteArray("m2".getBytes()), t.get(3));
+
+        HashMap<String, DataByteArray> bMap = new HashMap<String, DataByteArray>();
+        bMap.put("b:1", new DataByteArray("b1".getBytes()));
+        bMap.put("b:2", new DataByteArray("b2".getBytes()));
+        Assert.assertEquals(bMap, t.get(4));
+    }
+
+}

Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageConfiguration.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestAccumuloStorageConfiguration {
+
+    protected Configuration original;
+    protected AccumuloStorage storage;
+
+    @Before
+    public void setup() throws ParseException, IOException {
+        storage = new AccumuloStorage();
+
+        original = new Configuration();
+
+        original.set("string1", "value1");
+        original.set("string2", "value2");
+        original.set("string3", "value3");
+        original.setBoolean("boolean", true);
+        original.setLong("long", 10);
+        original.setInt("integer", 20);
+    }
+
+    protected Map<String, String> getContents(Configuration conf) {
+        Map<String, String> contents = new HashMap<String, String>();
+        Iterator<Entry<String, String>> iter = conf.iterator();
+        while (iter.hasNext()) {
+            Entry<String, String> entry = iter.next();
+
+            contents.put(entry.getKey(), entry.getValue());
+        }
+
+        return contents;
+    }
+
+    @Test
+    public void testClearEquivalenceStrings() {
+        Configuration clearCopy = new Configuration(original);
+
+        Assert.assertEquals(getContents(original), getContents(clearCopy));
+
+        Map<String, String> entriesToUnset = new HashMap<String, String>();
+        entriesToUnset.put("string1", "foo");
+        entriesToUnset.put("string3", "bar");
+
+        storage.clearUnset(clearCopy, entriesToUnset);
+
+        Configuration originalCopy = new Configuration();
+        for (Entry<String, String> entry : original) {
+            if (!"string1".equals(entry.getKey())
+                    && !"string3".equals(entry.getKey())) {
+                originalCopy.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+    }
+
+    @Test
+    public void testClearEquivalenceOnTypes() {
+        Configuration clearCopy = new Configuration(original);
+
+        Assert.assertEquals(getContents(original), getContents(clearCopy));
+
+        Map<String, String> entriesToUnset = new HashMap<String, String>();
+        entriesToUnset.put("long", "foo");
+        entriesToUnset.put("boolean", "bar");
+        entriesToUnset.put("integer", "foobar");
+
+        storage.clearUnset(clearCopy, entriesToUnset);
+
+        Configuration originalCopy = new Configuration();
+        for (Entry<String, String> entry : original) {
+            if (!"long".equals(entry.getKey())
+                    && !"boolean".equals(entry.getKey())
+                    && !"integer".equals(entry.getKey())) {
+                originalCopy.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+    }
+
+    @Ignore
+    // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+    // Configuration#unset
+    @Test
+    public void testUnsetEquivalenceStrings() {
+        Configuration unsetCopy = new Configuration(original);
+
+        Assert.assertEquals(getContents(original), getContents(unsetCopy));
+
+        Map<String, String> entriesToUnset = new HashMap<String, String>();
+        entriesToUnset.put("string1", "foo");
+        entriesToUnset.put("string3", "bar");
+
+        storage.simpleUnset(unsetCopy, entriesToUnset);
+
+        Configuration originalCopy = new Configuration();
+        for (Entry<String, String> entry : original) {
+            if (!"string1".equals(entry.getKey())
+                    && !"string2".equals(entry.getKey())) {
+                originalCopy.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+    }
+
+    @Ignore
+    // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+    // Configuration#unset
+    @Test
+    public void testEquivalenceStrings() {
+        Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(
+                original);
+
+        Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+        Map<String, String> entriesToUnset = new HashMap<String, String>();
+        entriesToUnset.put("string1", "foo");
+        entriesToUnset.put("string3", "bar");
+
+        storage.simpleUnset(unsetCopy, entriesToUnset);
+        storage.clearUnset(clearCopy, entriesToUnset);
+
+        Assert.assertEquals(getContents(unsetCopy), getContents(clearCopy));
+
+        Configuration originalCopy = new Configuration();
+        for (Entry<String, String> entry : original) {
+            if (!"string1".equals(entry.getKey())
+                    && !"string2".equals(entry.getKey())) {
+                originalCopy.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+        Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+    }
+
+    @Ignore
+    // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+    // Configuration#unset
+    @Test
+    public void testEquivalenceOnTypes() {
+        Configuration unsetCopy = new Configuration(original), clearCopy = new Configuration(
+                original);
+
+        Assert.assertEquals(getContents(original), getContents(unsetCopy));
+        Assert.assertEquals(getContents(original), getContents(clearCopy));
+
+        Map<String, String> entriesToUnset = new HashMap<String, String>();
+        entriesToUnset.put("long", "foo");
+        entriesToUnset.put("boolean", "bar");
+        entriesToUnset.put("integer", "foobar");
+
+        storage.simpleUnset(unsetCopy, entriesToUnset);
+        storage.clearUnset(clearCopy, entriesToUnset);
+
+        Configuration originalCopy = new Configuration();
+        for (Entry<String, String> entry : original) {
+            if (!"long".equals(entry.getKey())
+                    && !"boolean".equals(entry.getKey())
+                    && !"integer".equals(entry.getKey())) {
+                originalCopy.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+        Assert.assertEquals(getContents(originalCopy), getContents(clearCopy));
+    }
+
+    @Ignore
+    // Ignored until dependency gets updated to Hadoop >=1.2.0 and we have
+    // Configuration#unset
+    @Test
+    public void testUnsetEquivalenceOnTypes() {
+        Configuration unsetCopy = new Configuration(original);
+
+        Assert.assertEquals(getContents(original), getContents(unsetCopy));
+
+        Map<String, String> entriesToUnset = new HashMap<String, String>();
+        entriesToUnset.put("long", "foo");
+        entriesToUnset.put("boolean", "bar");
+        entriesToUnset.put("integer", "foobar");
+
+        storage.simpleUnset(unsetCopy, entriesToUnset);
+
+        Configuration originalCopy = new Configuration();
+        for (Entry<String, String> entry : original) {
+            if (!"long".equals(entry.getKey())
+                    && !"boolean".equals(entry.getKey())
+                    && !"integer".equals(entry.getKey())) {
+                originalCopy.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        Assert.assertEquals(getContents(originalCopy), getContents(unsetCopy));
+    }
+}

Added: pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java?rev=1559022&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java (added)
+++ pig/trunk/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloStorageOptions.java Fri Jan 17 06:36:44 2014
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.accumulo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.cli.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAccumuloStorageOptions {
+
+    @Test
+    public void testFetchColumns() throws ParseException, IOException {
+        AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+                .getAbstractAccumuloStorage("cf1,cf2:cq2");
+
+        List<Column> pairs = new LinkedList<Column>();
+        pairs.add(new Column("cf1"));
+        pairs.add(new Column("cf2:cq2"));
+
+        Assert.assertEquals(pairs, storage.columns);
+    }
+
+    @Test
+    public void testWriteColumns() throws ParseException, IOException {
+        AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+                .getAbstractAccumuloStorage("foo,bar,baz,foo:bar,foo:baz");
+
+        List<Column> columnNames = Arrays.asList(new Column("foo"), new Column(
+                "bar"), new Column("baz"), new Column("foo:bar"), new Column(
+                "foo:baz"));
+
+        Assert.assertEquals(columnNames, storage.columns);
+    }
+
+    @Test
+    public void testAuths() throws ParseException, IOException {
+        AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+                .getAbstractAccumuloStorage("", "--authorizations auth1,auth2");
+
+        Authorizations auths = new Authorizations("auth1,auth2");
+
+        Assert.assertEquals(auths, storage.authorizations);
+
+        storage = TestAbstractAccumuloStorage.getAbstractAccumuloStorage("",
+                "-auths auth1,auth2");
+        Assert.assertEquals(auths, storage.authorizations);
+    }
+
+    @Test
+    public void testStartEndRows() throws ParseException, IOException {
+        AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+                .getAbstractAccumuloStorage("", "--start begin --end finish");
+
+        Assert.assertEquals("begin", storage.start);
+        Assert.assertEquals("finish", storage.end);
+
+        storage = TestAbstractAccumuloStorage.getAbstractAccumuloStorage("",
+                "-s begin -e finish");
+        Assert.assertEquals("begin", storage.start);
+        Assert.assertEquals("finish", storage.end);
+    }
+
+    @Test
+    public void testBatchWriterOptions() throws ParseException, IOException {
+        long buffSize = 1024 * 50;
+        int writeThreads = 8, maxLatency = 30 * 1000;
+
+        AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+                .getAbstractAccumuloStorage("", "--mutation-buffer-size "
+                        + buffSize + " --write-threads " + writeThreads
+                        + " --max-latency " + maxLatency);
+
+        Assert.assertEquals(buffSize, storage.maxMutationBufferSize);
+        Assert.assertEquals(writeThreads, storage.maxWriteThreads);
+        Assert.assertEquals(maxLatency, storage.maxLatency);
+
+        storage = TestAbstractAccumuloStorage.getAbstractAccumuloStorage("",
+                "-buff " + buffSize + " -wt " + writeThreads + " -ml "
+                        + maxLatency);
+
+        Assert.assertEquals(buffSize, storage.maxMutationBufferSize);
+        Assert.assertEquals(writeThreads, storage.maxWriteThreads);
+        Assert.assertEquals(maxLatency, storage.maxLatency);
+    }
+
+    @Test
+    public void testColumnOptions() throws ParseException, IOException {
+        AbstractAccumuloStorage storage = TestAbstractAccumuloStorage
+                .getAbstractAccumuloStorage("foo-bar ",
+                        "--ignore-whitespace false --separator -");
+
+        Assert.assertEquals(2, storage.columns.size());
+
+        Assert.assertEquals("foo", storage.columns.get(0).getColumnFamily());
+        Assert.assertEquals("bar ", storage.columns.get(1).getColumnFamily());
+    }
+}