You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/19 01:53:15 UTC

[08/13] git commit: added test cases and cleaned up the Storage classes

added test cases and cleaned up the Storage classes


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/b9f9b912
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/b9f9b912
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/b9f9b912

Branch: refs/heads/1.5
Commit: b9f9b9125634b4a8feb47ed7659fced2c97e781d
Parents: c478e29
Author: Jason Trost <ja...@gmail.com>
Authored: Wed May 16 12:55:07 2012 -0400
Committer: Jason Trost <ja...@gmail.com>
Committed: Wed May 16 12:55:07 2012 -0400

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 302 ++++++++++++++++++
 .../apache/accumulo/pig/AccumuloStorage.java    | 295 ++----------------
 .../accumulo/pig/AccumuloWholeRowStorage.java   | 303 ++-----------------
 .../java/org/apache/accumulo/pig/Utils.java     |  59 ++++
 .../pig/AbstractAccumuloStorageTest.java        | 156 ++++++++++
 .../accumulo/pig/AccumuloStorageTest.java       | 107 +++++++
 .../pig/AccumuloWholeRowStorageTest.java        | 164 ++++++++++
 .../java/org/apache/accumulo/pig/TestUtils.java |  84 +++++
 8 files changed, 931 insertions(+), 539 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
new file mode 100644
index 0000000..9c5ed75
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Accumulo
+ *
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
+ * 
+ * Tuples can be written in 2 forms:
+ *  (key, colfam, colqual, colvis, value)
+ *    OR
+ *  (key, colfam, colqual, value)
+ * 
+ */
+public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface
+{
+    private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
+
+    private Configuration conf;
+    private RecordReader<Key, Value> reader;
+    private RecordWriter<Text, Mutation> writer;
+    
+    String inst;
+    String zookeepers;
+    String user;
+    String password;
+    String table;
+    Text tableName;
+    String auths;
+    Authorizations authorizations;
+    List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
+    
+    String start = null;
+    String end = null;
+    
+    int maxWriteThreads = 10;
+    long maxMutationBufferSize = 10*1000*1000;
+    int maxLatency = 10*1000;
+
+    public AbstractAccumuloStorage(){}
+
+	@Override
+    public Tuple getNext() throws IOException
+    {
+        try
+        {
+            // load the next pair
+            if (!reader.nextKeyValue())
+                return null;
+            
+            Key key = (Key)reader.getCurrentKey();
+            Value value = (Value)reader.getCurrentValue();
+            assert key != null && value != null;
+            return getTuple(key, value);
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e.getMessage());
+        }
+    }
+	
+	protected abstract Tuple getTuple(Key key, Value value) throws IOException;
+	
+
+    @Override
+    public InputFormat getInputFormat()
+    {
+        return new AccumuloInputFormat();
+    }
+
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split)
+    {
+        this.reader = reader;
+    }
+
+    private void setLocationFromUri(String location) throws IOException
+    {
+        // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
+        String columns = "";
+        try
+        {
+            if (!location.startsWith("accumulo://"))
+                throw new Exception("Bad scheme.");
+            String[] urlParts = location.split("\\?");
+            if (urlParts.length > 1)
+            {
+                for (String param : urlParts[1].split("&"))
+                {
+                    String[] pair = param.split("=");
+                    if (pair[0].equals("instance"))
+                        inst = pair[1];
+                    else if (pair[0].equals("user"))
+                        user = pair[1];
+                    else if (pair[0].equals("password"))
+                        password = pair[1];
+                    else if (pair[0].equals("zookeepers"))
+                    	zookeepers = pair[1];
+                    else if (pair[0].equals("auths"))
+                    	auths = pair[1];
+                    else if (pair[0].equals("columns"))
+                    	columns = pair[1];
+                    else if (pair[0].equals("start"))
+                    	start = pair[1];
+                    else if (pair[0].equals("end"))
+                    	end = pair[1];
+                    else if (pair[0].equals("write_buffer_size_bytes"))
+                    	maxMutationBufferSize = Long.parseLong(pair[1]);
+                    else if (pair[0].equals("write_threads"))
+                    	maxWriteThreads = Integer.parseInt(pair[1]);
+                    else if (pair[0].equals("write_latency_ms"))
+                    	maxLatency = Integer.parseInt(pair[1]);
+                }
+            }
+            String[] parts = urlParts[0].split("/+");
+            table = parts[1];
+            tableName = new Text(table);
+            
+            if(auths == null || auths.equals(""))
+            {
+            	authorizations = new Authorizations();
+            }
+            else
+            {
+            	authorizations = new Authorizations(auths.split(","));
+            }
+            
+            if(!columns.equals("")){
+            	for(String cfCq : columns.split(","))
+            	{
+            		if(cfCq.contains("|"))
+            		{
+            			String[] c = cfCq.split("\\|");
+            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(c[0]), new Text(c[1])));
+            		}
+            		else
+            		{
+            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(cfCq), null));
+            		}
+            	}
+            }
+            	
+        }
+        catch (Exception e)
+        {
+            throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&" +
+            						"[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': " + e.getMessage());
+        }
+    }
+    
+    protected RecordWriter<Text, Mutation> getWriter() {
+		return writer;
+	}
+    
+    @Override
+    public void setLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        setLocationFromUri(location);
+        
+        if(!conf.getBoolean(AccumuloInputFormat.class.getSimpleName()+".configured", false))
+        {
+        	AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
+            AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+            if(columnFamilyColumnQualifierPairs.size() > 0)
+            {
+            	LOG.info("columns: "+columnFamilyColumnQualifierPairs);
+            	AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
+            }
+            
+            AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
+            configureInputFormat(conf);
+        }
+    }
+
+    protected void configureInputFormat(Configuration conf)
+    {
+    	
+    }
+    
+    protected void configureOutputFormat(Configuration conf)
+    {
+    	
+    }
+    
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+    {
+        return location;
+    }
+
+    @Override
+    public void setUDFContextSignature(String signature)
+    {
+        
+    }
+
+    /* StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+        
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+    
+    public void setStoreLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        setLocationFromUri(location);
+        
+        if(!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName()+".configured", false))
+        {
+        	AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
+            AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+            AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
+            AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
+            AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
+            configureOutputFormat(conf);
+        }
+    }
+
+    public OutputFormat getOutputFormat()
+    {
+        return new AccumuloOutputFormat();
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
+    public void prepareToWrite(RecordWriter writer)
+    {
+        this.writer = writer;
+    }
+    
+    public abstract Collection<Mutation> getMutations(Tuple tuple)throws ExecException, IOException;
+
+    public void putNext(Tuple tuple) throws ExecException, IOException
+    {
+    	Collection<Mutation> muts = getMutations(tuple);
+    	for(Mutation mut : muts)
+    	{
+    		try {
+    			getWriter().write(tableName, mut);
+    		} catch (InterruptedException e) {
+    			throw new IOException(e);
+    		}
+    	}
+    }
+
+    public void cleanupOnFailure(String failure, Job job){}
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 0db6886..0803aa6 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -17,34 +17,17 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
 
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -60,218 +43,35 @@ import org.apache.pig.data.TupleFactory;
  *  (key, colfam, colqual, value)
  * 
  */
-public class AccumuloStorage extends LoadFunc implements StoreFuncInterface
+public class AccumuloStorage extends AbstractAccumuloStorage
 {
-    private static final Log logger = LogFactory.getLog(AccumuloStorage.class);
-
-    private Configuration conf;
-    private RecordReader<Key, Value> reader;
-    private RecordWriter<Text, Mutation> writer;
-    
-    String inst;
-    String zookeepers;
-    String user;
-    String password;
-    String table;
-    Text tableName;
-    String auths;
-    Authorizations authorizations;
-    List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-    
-    String start = null;
-    String end = null;
+    private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
 
     public AccumuloStorage(){}
 
 	@Override
-    public Tuple getNext() throws IOException
-    {
-        try
-        {
-            // load the next pair
-            if (!reader.nextKeyValue())
-                return null;
-            
-            Key key = (Key)reader.getCurrentKey();
-            Value value = (Value)reader.getCurrentValue();
-            assert key != null && value != null;
-            
-            // and wrap it in a tuple
-	        Tuple tuple = TupleFactory.getInstance().newTuple(6);
-            tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-            tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
-            tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
-            tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
-            tuple.set(4, new Long(key.getTimestamp()));
-            tuple.set(5, new DataByteArray(value.get()));
-            return tuple;
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-    }    
-
-    @Override
-    public InputFormat getInputFormat()
-    {
-        return new AccumuloInputFormat();
-    }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    {
-        this.reader = reader;
-    }
-
-    private void setLocationFromUri(String location) throws IOException
-    {
-        // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
-        String names[];
-        String columns = "";
-        try
-        {
-            if (!location.startsWith("accumulo://"))
-                throw new Exception("Bad scheme.");
-            String[] urlParts = location.split("\\?");
-            if (urlParts.length > 1)
-            {
-                for (String param : urlParts[1].split("&"))
-                {
-                    String[] pair = param.split("=");
-                    if (pair[0].equals("instance"))
-                        inst = pair[1];
-                    else if (pair[0].equals("user"))
-                        user = pair[1];
-                    else if (pair[0].equals("password"))
-                        password = pair[1];
-                    else if (pair[0].equals("zookeepers"))
-                    	zookeepers = pair[1];
-                    else if (pair[0].equals("auths"))
-                    	auths = pair[1];
-                    else if (pair[0].equals("columns"))
-                    	columns = pair[1];
-                    else if (pair[0].equals("start"))
-                    	start = pair[1];
-                    else if (pair[0].equals("end"))
-                    	end = pair[1];
-                }
-            }
-            String[] parts = urlParts[0].split("/+");
-            table = parts[1];
-            tableName = new Text(table);
-            
-            if(auths == null || auths.equals(""))
-            {
-            	authorizations = new Authorizations();
-            }
-            else
-            {
-            	authorizations = new Authorizations(auths.split(","));
-            }
-            
-            if(!columns.equals("")){
-            	for(String cfCq : columns.split(","))
-            	{
-            		if(cfCq.contains("|"))
-            		{
-            			String[] c = cfCq.split("\\|");
-            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(c[0]), new Text(c[1])));
-            		}
-            		else
-            		{
-            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(cfCq), null));
-            		}
-            	}
-            }
-            	
-        }
-        catch (Exception e)
-        {
-            throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...]]]': " + e.getMessage());
-        }
-    }
-
-    @Override
-    public void setLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-        
-        if(!conf.getBoolean(AccumuloInputFormat.class.getSimpleName()+".configured", false))
-        {
-        	AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
-            AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-            if(columnFamilyColumnQualifierPairs.size() > 0)
-            	AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
-            
-            AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
-        }
-    }
-
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
-    {
-        return location;
-    }
-
-    @Override
-    public void setUDFContextSignature(String signature)
-    {
-        
-    }
-
-    /* StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature)
-    {
-        
-    }
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-    {
-        return relativeToAbsolutePath(location, curDir);
-    }
-    
-    public void setStoreLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-        
-        if(!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName()+".configured", false))
-        {
-        	AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
-            AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-            AccumuloOutputFormat.setMaxLatency(conf, 10*1000);
-            AccumuloOutputFormat.setMaxMutationBufferSize(conf, 10*1000*1000);
-            AccumuloOutputFormat.setMaxWriteThreads(conf, 10);
-        }
-    }
-
-    public OutputFormat getOutputFormat()
-    {
-        return new AccumuloOutputFormat();
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException
-    {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
-    public void prepareToWrite(RecordWriter writer)
-    {
-        this.writer = writer;
-    }
-
-    public void putNext(Tuple t) throws ExecException, IOException
-    {
-        Mutation mut = new Mutation(objToText(t.get(0)));
-        Text cf = objToText(t.get(1));
-    	Text cq = objToText(t.get(2));
+	protected Tuple getTuple(Key key, Value value) throws IOException {
+		// and wrap it in a tuple
+        Tuple tuple = TupleFactory.getInstance().newTuple(6);
+        tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+        tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+        tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+        tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+        tuple.set(4, new Long(key.getTimestamp()));
+        tuple.set(5, new DataByteArray(value.get()));
+        return tuple;
+	}
+	
+	@Override
+	public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+		Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+        Text cf = Utils.objToText(tuple.get(1));
+    	Text cq = Utils.objToText(tuple.get(2));
     	
-        if(t.size() > 4)
+        if(tuple.size() > 4)
         {
-        	Text cv = objToText(t.get(3));
-        	Value val = new Value(objToBytes(t.get(4)));
+        	Text cv = Utils.objToText(tuple.get(3));
+        	Value val = new Value(Utils.objToBytes(tuple.get(4)));
         	if(cv.getLength() == 0)
         	{
         		mut.put(cf, cq, val);
@@ -283,53 +83,10 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface
         }
         else
         {
-        	Value val = new Value(objToBytes(t.get(3)));
+        	Value val = new Value(Utils.objToBytes(tuple.get(3)));
         	mut.put(cf, cq, val);
         }
         
-        try {
-			writer.write(tableName, mut);
-		} catch (InterruptedException e) {
-			throw new IOException(e);
-		}
-    }
-    
-    private static Text objToText(Object o)
-    {
-    	return new Text(objToBytes(o));
-    }
-    
-    private static byte[] objToBytes(Object o)
-    {
-    	if (o instanceof String) {
-			String str = (String) o;
-			return str.getBytes();
-		}
-    	else if (o instanceof Long) {
-			Long l = (Long) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Integer) {
-    		Integer l = (Integer) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Boolean) {
-    		Boolean l = (Boolean) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Float) {
-    		Float l = (Float) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Double) {
-    		Double l = (Double) o;
-			return l.toString().getBytes();
-		}
-    	
-    	// TODO: handle DataBag, Map<Object, Object>, and Tuple
-    	
-    	return ((DataByteArray)o).get();
-    }
-
-    public void cleanupOnFailure(String failure, Job job){}
+        return Collections.singleton(mut);
+	}
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index c225e38..a959dda 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -18,38 +18,24 @@ package org.apache.accumulo.pig;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
@@ -66,67 +52,35 @@ import org.apache.pig.data.TupleFactory;
  *  (key, colfam, colqual, value)
  * 
  */
-public class AccumuloWholeRowStorage extends LoadFunc implements StoreFuncInterface
+public class AccumuloWholeRowStorage extends AbstractAccumuloStorage
 {
     private static final Log LOG = LogFactory.getLog(AccumuloWholeRowStorage.class);
 
-    private Configuration conf;
-    private RecordReader<Key, Value> reader;
-    private RecordWriter<Text, Mutation> writer;
-    
-    String inst;
-    String zookeepers;
-    String user;
-    String password;
-    String table;
-    Text tableName;
-    String auths;
-    Authorizations authorizations;
-    List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-    
-    String start = null;
-    String end = null;
-
     public AccumuloWholeRowStorage(){}
 
 	@Override
-    public Tuple getNext() throws IOException
-    {
-        try
+	protected Tuple getTuple(Key key, Value value) throws IOException {
+		
+		SortedMap<Key, Value> rowKVs =  WholeRowIterator.decodeRow(key, value);
+        List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
+        for(Entry<Key, Value> e : rowKVs.entrySet())
         {
-            // load the next pair
-            if (!reader.nextKeyValue())
-                return null;
-            
-            Key key = (Key)reader.getCurrentKey();
-            Value value = (Value)reader.getCurrentValue();
-            assert key != null && value != null;
-            
-            SortedMap<Key, Value> rowKVs =  WholeRowIterator.decodeRow(key, value);
-            List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
-            for(Entry<Key, Value> e : rowKVs.entrySet())
-            {
-            	columns.add(columnToTuple(
-            			e.getKey().getColumnFamily(), 
-            			e.getKey().getColumnQualifier(), 
-            			e.getKey().getColumnVisibility(), 
-            			e.getKey().getTimestamp(), 
-            			e.getValue())
-            		);
-            }
-            
-            // and wrap it in a tuple
-	        Tuple tuple = TupleFactory.getInstance().newTuple(2);
-            tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-            tuple.set(1, new DefaultDataBag(columns));
-            
-            return tuple;
+        	columns.add(columnToTuple(
+        			e.getKey().getColumnFamily(), 
+        			e.getKey().getColumnQualifier(), 
+        			e.getKey().getColumnVisibility(), 
+        			e.getKey().getTimestamp(), 
+        			e.getValue())
+        		);
         }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-    }    
+        
+        // and wrap it in a tuple
+        Tuple tuple = TupleFactory.getInstance().newTuple(2);
+        tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+        tuple.set(1, new DefaultDataBag(columns));
+        
+        return tuple;
+	}
 
 	private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException
     {
@@ -139,218 +93,27 @@ public class AccumuloWholeRowStorage extends LoadFunc implements StoreFuncInterf
         return tuple;
     }
 	
-    @Override
-    public InputFormat getInputFormat()
-    {
-        return new AccumuloInputFormat();
-    }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    {
-        this.reader = reader;
-    }
-
-    private void setLocationFromUri(String location) throws IOException
-    {
-        // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1:cq1,col2:cq2&start=abc&end=z
-        String names[];
-        String columns = "";
-        try
-        {
-            if (!location.startsWith("accumulo://"))
-                throw new Exception("Bad scheme.");
-            String[] urlParts = location.split("\\?");
-            if (urlParts.length > 1)
-            {
-                for (String param : urlParts[1].split("&"))
-                {
-                    String[] pair = param.split("=");
-                    if (pair[0].equals("instance"))
-                        inst = pair[1];
-                    else if (pair[0].equals("user"))
-                        user = pair[1];
-                    else if (pair[0].equals("password"))
-                        password = pair[1];
-                    else if (pair[0].equals("zookeepers"))
-                    	zookeepers = pair[1];
-                    else if (pair[0].equals("auths"))
-                    	auths = pair[1];
-                    else if (pair[0].equals("columns"))
-                    	columns = pair[1];
-                    else if (pair[0].equals("start"))
-                    	start = pair[1];
-                    else if (pair[0].equals("end"))
-                    	end = pair[1];
-                }
-            }
-            String[] parts = urlParts[0].split("/+");
-            table = parts[1];
-            tableName = new Text(table);
-            
-            if(auths == null || auths.equals(""))
-            {
-            	authorizations = new Authorizations();
-            }
-            else
-            {
-            	authorizations = new Authorizations(auths.split(","));
-            }
-            
-            if(!columns.equals("")){
-            	for(String cfCq : columns.split(","))
-            	{
-            		if(cfCq.contains("|"))
-            		{
-            			String[] c = cfCq.split("\\|");
-            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(c[0]), new Text(c[1])));
-            		}
-            		else
-            		{
-            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(cfCq), null));
-            		}
-            	}
-            }
-            	
-        }
-        catch (Exception e)
-        {
-            throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[start=startRow,end=endRow,columns=[cf1:cq1,cf2:cq2,...]]]': " + e.getMessage());
-        }
-    }
-
-    @Override
-    public void setLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-        
-        if(!conf.getBoolean(AccumuloInputFormat.class.getSimpleName()+".configured", false))
-        {
-        	AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
-            AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-            if(columnFamilyColumnQualifierPairs.size() > 0)
-            {
-            	LOG.info("columns: "+columnFamilyColumnQualifierPairs);
-            	AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
-            }
-            
-            AccumuloInputFormat.setRanges(conf, Collections.singleton(new Range(start, end)));
-            AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
-        }
-    }
-
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+    protected void configureInputFormat(Configuration conf)
     {
-        return location;
-    }
-
-    @Override
-    public void setUDFContextSignature(String signature)
-    {
-        
-    }
-
-    /* StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature)
-    {
-        
-    }
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-    {
-        return relativeToAbsolutePath(location, curDir);
+    	AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
     }
     
-    public void setStoreLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-        
-        if(!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName()+".configured", false))
-        {
-        	AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
-            AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-            AccumuloOutputFormat.setMaxLatency(conf, 10*1000);
-            AccumuloOutputFormat.setMaxMutationBufferSize(conf, 10*1000*1000);
-            AccumuloOutputFormat.setMaxWriteThreads(conf, 10);
-        }
-    }
-
-    public OutputFormat getOutputFormat()
-    {
-        return new AccumuloOutputFormat();
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException
-    {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
-    public void prepareToWrite(RecordWriter writer)
-    {
-        this.writer = writer;
-    }
-
-    public void putNext(Tuple tuple) throws ExecException, IOException
-    {
-        Mutation mut = new Mutation(objToText(tuple.get(0)));
+    @Override
+    public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+    	
+    	Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
         DefaultDataBag columns = (DefaultDataBag)tuple.get(1);
         for(Tuple column : columns)
         {
-        	Text cf = objToText(column.get(0));
-        	Text cq = objToText(column.get(1));
-        	Text cv = objToText(column.get(2));
+        	Text cf = Utils.objToText(column.get(0));
+        	Text cq = Utils.objToText(column.get(1));
+        	Text cv = Utils.objToText(column.get(2));
         	Long ts = (Long)column.get(3);
-        	Value val = new Value(objToBytes(column.get(4)));
+        	Value val = new Value(Utils.objToBytes(column.get(4)));
         	
         	mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
         }
-        
-        try {
-			writer.write(tableName, mut);
-		} catch (InterruptedException e) {
-			throw new IOException(e);
-		}
-    }
-    
-    private static Text objToText(Object o)
-    {
-    	return new Text(objToBytes(o));
-    }
-        
-    private static byte[] objToBytes(Object o)
-    {
-    	if (o instanceof String) {
-			String str = (String) o;
-			return str.getBytes();
-		}
-    	else if (o instanceof Long) {
-			Long l = (Long) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Integer) {
-    		Integer l = (Integer) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Boolean) {
-    		Boolean l = (Boolean) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Float) {
-    		Float l = (Float) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Double) {
-    		Double l = (Double) o;
-			return l.toString().getBytes();
-		}
     	
-    	// TODO: handle DataBag, Map<Object, Object>, and Tuple
-    	
-    	return ((DataByteArray)o).get();
+    	return Collections.singleton(mut);
     }
-
-    public void cleanupOnFailure(String failure, Job job){}
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/main/java/org/apache/accumulo/pig/Utils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/Utils.java b/src/main/java/org/apache/accumulo/pig/Utils.java
new file mode 100644
index 0000000..e43c078
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/Utils.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import org.apache.hadoop.io.Text;
+import org.apache.pig.data.DataByteArray;
+
+public class Utils {
+	public static Text objToText(Object o)
+    {
+    	return new Text(objToBytes(o));
+    }
+    
+    public static byte[] objToBytes(Object o)
+    {
+    	if (o instanceof String) {
+			String str = (String) o;
+			return str.getBytes();
+		}
+    	else if (o instanceof Long) {
+			Long l = (Long) o;
+			return l.toString().getBytes();
+		}
+    	else if (o instanceof Integer) {
+    		Integer l = (Integer) o;
+			return l.toString().getBytes();
+		}
+    	else if (o instanceof Boolean) {
+    		Boolean l = (Boolean) o;
+			return l.toString().getBytes();
+		}
+    	else if (o instanceof Float) {
+    		Float l = (Float) o;
+			return l.toString().getBytes();
+		}
+    	else if (o instanceof Double) {
+    		Double l = (Double) o;
+			return l.toString().getBytes();
+		}
+    	
+    	// TODO: handle DataBag, Map<Object, Object>, and Tuple
+    	
+    	return ((DataByteArray)o).get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
new file mode 100644
index 0000000..402c74a
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+public class AbstractAccumuloStorageTest {
+	
+	public Job getExpectedLoadJob(String inst, String zookeepers,  String user, String password,String table, 
+			String start,String end,Authorizations authorizations, List<Pair<Text, Text>> columnFamilyColumnQualifierPairs) throws IOException
+	{
+	    Collection<Range> ranges = new LinkedList<Range>();
+	    ranges.add(new Range(start, end));
+	    
+		Job expected = new Job();
+		Configuration expectedConf = expected.getConfiguration();
+		AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
+        AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+        AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
+        AccumuloInputFormat.setRanges(expectedConf, ranges);
+        return expected;
+	}
+	
+	public Job getDefaultExpectedLoadJob() throws IOException
+	{
+		String inst = "myinstance";
+	    String zookeepers = "127.0.0.1:2181";
+	    String user = "root";
+	    String password = "secret";
+	    String table = "table1";
+	    String start = "abc";
+	    String end = "z";
+	    Authorizations authorizations = new Authorizations("PRIVATE,PUBLIC".split(","));
+	    
+	    List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
+	    columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col1"), new Text("cq1")));
+	    columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col2"), new Text("cq2")));
+	    columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col3"), null));
+	    
+		Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
+		return expected;
+	}
+	
+	public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password,String table, long maxWriteBufferSize, int writeThreads, int maxWriteLatencyMS) throws IOException
+	{
+		Job expected = new Job();
+		Configuration expectedConf = expected.getConfiguration();
+		AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
+        AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+        AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
+        AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
+        AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+        
+        return expected;
+	}
+	
+	public Job getDefaultExpectedStoreJob() throws IOException
+	{
+		String inst = "myinstance";
+	    String zookeepers = "127.0.0.1:2181";
+	    String user = "root";
+	    String password = "secret";
+	    String table = "table1";
+	    long maxWriteBufferSize = 1234000;
+	    int writeThreads = 7;
+	    int maxWriteLatencyMS = 30000;
+	    
+		Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+		return expected;
+	}
+	
+	public String getDefaultLoadLocation()
+	{
+		return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
+	}
+	
+	public String getDefaultStoreLocation()
+	{
+		return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&write_buffer_size_bytes=1234000&write_threads=7&write_latency_ms=30000";
+	}
+	
+	public AbstractAccumuloStorage getAbstractAccumuloStorage()
+	{
+		AbstractAccumuloStorage s = new AbstractAccumuloStorage() {
+			
+			@Override
+			public Collection<Mutation> getMutations(Tuple tuple) {return null;}
+
+			@Override
+			protected Tuple getTuple(Key key, Value value) throws IOException {return null;}
+		};
+		return s;
+	}
+	
+	
+	@Test
+	public void testSetLoadLocation() throws IOException
+	{
+		AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+		
+		Job actual = new Job();
+		s.setLocation(getDefaultLoadLocation(), actual);
+		Configuration actualConf = actual.getConfiguration();
+		
+		Job expected = getDefaultExpectedLoadJob();
+		Configuration expectedConf = expected.getConfiguration();
+		
+		TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+	}
+	
+	@Test
+	public void testSetStoreLocation() throws IOException
+	{
+		AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+		
+		Job actual = new Job();
+		s.setStoreLocation(getDefaultStoreLocation(), actual);
+		Configuration actualConf = actual.getConfiguration();
+		
+		Job expected = getDefaultExpectedStoreJob();
+		Configuration expectedConf = expected.getConfiguration();
+		
+		TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+	}
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
new file mode 100644
index 0000000..dd45a1a
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Test;
+
+public class AccumuloStorageTest {
+	
+	@Test
+	public void testGetMutations4() throws Exception
+	{
+		AccumuloStorage s = new AccumuloStorage();
+		
+		Tuple tuple = TupleFactory.getInstance().newTuple(4);
+		tuple.set(0, "row1");
+		tuple.set(1, "cf1");
+		tuple.set(2, "cq1");
+		tuple.set(3, "val1");
+		
+		Collection<Mutation> muts = s.getMutations(tuple);
+		
+		assertNotNull(muts);
+		assertEquals(1, muts.size());
+		Mutation mut = muts.iterator().next();
+		List<ColumnUpdate> updates = mut.getUpdates();
+		assertEquals(1, updates.size());
+		ColumnUpdate update = updates.get(0);
+		
+		assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
+		assertTrue(Arrays.equals(((String)tuple.get(1)).getBytes(), update.getColumnFamily()));
+		assertTrue(Arrays.equals(((String)tuple.get(2)).getBytes(), update.getColumnQualifier()));
+		assertTrue(Arrays.equals(((String)tuple.get(3)).getBytes(), update.getValue()));
+		assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
+	}
+	
+	@Test
+	public void testGetMutations5() throws Exception
+	{
+		AccumuloStorage s = new AccumuloStorage();
+		
+		Tuple tuple = TupleFactory.getInstance().newTuple(5);
+		tuple.set(0, "row1");
+		tuple.set(1, "cf1");
+		tuple.set(2, "cq1");
+		tuple.set(3, "cv1");
+		tuple.set(4, "val1");
+		
+		Collection<Mutation> muts = s.getMutations(tuple);
+		
+		assertNotNull(muts);
+		assertEquals(1, muts.size());
+		Mutation mut = muts.iterator().next();
+		List<ColumnUpdate> updates = mut.getUpdates();
+		assertEquals(1, updates.size());
+		ColumnUpdate update = updates.get(0);
+		
+		assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
+		assertTrue(Arrays.equals(((String)tuple.get(1)).getBytes(), update.getColumnFamily()));
+		assertTrue(Arrays.equals(((String)tuple.get(2)).getBytes(), update.getColumnQualifier()));
+		assertTrue(Arrays.equals(((String)tuple.get(3)).getBytes(), update.getColumnVisibility()));
+		assertTrue(Arrays.equals(((String)tuple.get(4)).getBytes(), update.getValue()));
+	}
+	
+	@Test
+	public void testGetTuple() throws Exception
+	{
+		AccumuloStorage s = new AccumuloStorage();
+		
+		Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
+		Value value = new Value("val1".getBytes());
+		Tuple tuple  = s.getTuple(key, value);
+		TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+		
+		key = new Key("row1", "cf1", "cq1");
+		value = new Value("val1".getBytes());
+		tuple  = s.getTuple(key, value);
+		TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+	}
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
new file mode 100644
index 0000000..750ebea
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Test;
+
+public class AccumuloWholeRowStorageTest {
+	
+	@Test
+	public void testConfiguration() throws IOException
+	{
+		AbstractAccumuloStorageTest test = new AbstractAccumuloStorageTest();
+		
+		AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+		
+		Job actual = new Job();
+		s.setLocation(test.getDefaultLoadLocation(), actual);
+		Configuration actualConf = actual.getConfiguration();
+		
+		Job expected =  test.getDefaultExpectedLoadJob();
+		Configuration expectedConf = expected.getConfiguration();
+		AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
+		
+		TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+	}
+	
+	public static Tuple generateTuple(String cf, String cq, String cv, Long ts, String val) throws ExecException
+	{
+		Tuple tuple = TupleFactory.getInstance().newTuple(5);
+		tuple.set(0, new DataByteArray(cf.getBytes()));
+		tuple.set(1, new DataByteArray(cq.getBytes()));
+		tuple.set(2, new DataByteArray(cv.getBytes()));
+		tuple.set(3, ts);
+		tuple.set(4, new DataByteArray(val.getBytes()));
+		return tuple;
+	}
+	
+	@Test
+	public void testGetMutations() throws Exception
+	{
+		Tuple tuple = TupleFactory.getInstance().newTuple(2);
+		tuple.set(0, "row1");
+		
+		DefaultDataBag bag = new DefaultDataBag();
+		bag.add(generateTuple("cf1", "cq1", "cv1", 1L, "val1"));
+		bag.add(generateTuple("cf2", "cq2", "cv2", 2L, "val2"));
+		bag.add(generateTuple("cf3", "cq3", "cv3", 3L, "val3"));
+		tuple.set(1, bag);
+		
+		AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+		Collection<Mutation> muts = s.getMutations(tuple);
+		
+		assertNotNull(muts);
+		assertEquals(1, muts.size());
+		Mutation mut = muts.iterator().next();
+		
+		List<ColumnUpdate> updates = mut.getUpdates();
+		assertEquals(3, updates.size());
+		
+		assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
+		
+		Iterator<Tuple> iter = bag.iterator();
+		for(ColumnUpdate update : updates)
+		{
+			Tuple colTuple = iter.next();
+			
+			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(0)).get(), update.getColumnFamily()));
+			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(1)).get(), update.getColumnQualifier()));
+			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(2)).get(), update.getColumnVisibility()));
+			assertEquals(((Long)colTuple.get(3)).longValue(), update.getTimestamp());
+			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(4)).get(), update.getValue()));
+		}
+	}
+	
+	@Test
+	public void testGetTuple() throws Exception
+	{
+		AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+		
+		Key key = new Key("row");
+		
+		List<Key> keys = new ArrayList<Key>(3);
+		keys.add(new Key("row", "cf1", "cf1", "cv1", 1L));
+		keys.add(new Key("row", "cf2", "cf2", "cv2", 2L));
+		keys.add(new Key("row", "cf3", "cf3", "cv3", 3L));
+		
+		List<Value> values = new ArrayList<Value>(3);
+		values.add(new Value("1".getBytes()));
+		values.add(new Value("2".getBytes()));
+		values.add(new Value("3".getBytes()));
+		
+		Value value = WholeRowIterator.encodeRow(keys, values);
+		
+		List<Tuple> columns = new LinkedList<Tuple>();
+		for(int i = 0; i < keys.size(); ++i)
+		{
+			columns.add(columnToTuple(
+							keys.get(i).getColumnFamily().toString(), 
+							keys.get(i).getColumnQualifier().toString(), 
+							keys.get(i).getColumnVisibility().toString(), 
+							keys.get(i).getTimestamp(), 
+							new String(values.get(i).get())));
+		}
+		
+		Tuple tuple = TupleFactory.getInstance().newTuple(2);
+        tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+        tuple.set(1, new DefaultDataBag(columns));
+        
+		TestUtils.assertWholeRowKeyValueEqualsTuple(key, value, tuple);
+	}
+	
+	private Tuple columnToTuple(String colfam, String colqual, String colvis, long ts, String val) throws IOException
+    {
+        Tuple tuple = TupleFactory.getInstance().newTuple(5);
+        tuple.set(0, new DataByteArray(colfam.getBytes()));
+        tuple.set(1, new DataByteArray(colqual.getBytes()));
+        tuple.set(2, new DataByteArray(colvis.getBytes()));
+        tuple.set(3, new Long(ts));
+        tuple.set(4, new DataByteArray(val.getBytes()));
+        return tuple;
+    }
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/b9f9b912/src/test/java/org/apache/accumulo/pig/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/TestUtils.java b/src/test/java/org/apache/accumulo/pig/TestUtils.java
new file mode 100644
index 0000000..5a9019b
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/TestUtils.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+
+public class TestUtils {
+	public static void assertConfigurationsEqual(Configuration expectedConf, Configuration actualConf)
+	{
+		// Basically, for all the keys in expectedConf, make sure the values in both confs are equal
+        Iterator<Entry<String, String>> expectedIter = expectedConf.iterator();
+        while(expectedIter.hasNext())
+        {
+        	Entry<String, String> e = expectedIter.next();
+        	assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+        }
+        
+        // Basically, for all the keys in actualConf, make sure the values in both confs are equal
+        Iterator<Entry<String, String>> actualIter = actualConf.iterator();
+        while(actualIter.hasNext())
+        {
+        	Entry<String, String> e = actualIter.next();
+        	assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+        }
+	}
+	
+	public static void assertKeyValueEqualsTuple(Key key, Value value, Tuple tuple) throws ExecException
+	{
+		assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray)tuple.get(0)).get()));
+		assertTrue(Arrays.equals(key.getColumnFamily().getBytes(), ((DataByteArray)tuple.get(1)).get()));
+		assertTrue(Arrays.equals(key.getColumnQualifier().getBytes(), ((DataByteArray)tuple.get(2)).get()));
+		assertTrue(Arrays.equals(key.getColumnVisibility().getBytes(), ((DataByteArray)tuple.get(3)).get()));
+		assertEquals(key.getTimestamp(), ((Long)tuple.get(4)).longValue());
+		assertTrue(Arrays.equals(value.get(), ((DataByteArray)tuple.get(5)).get()));
+	}
+	
+	public static void assertWholeRowKeyValueEqualsTuple(Key key, Value value, Tuple mainTuple) throws IOException
+	{
+		assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray)mainTuple.get(0)).get()));
+		
+		DefaultDataBag bag = (DefaultDataBag)mainTuple.get(1);
+		Iterator<Tuple> iter = bag.iterator();		
+		
+		for(Entry<Key, Value> e : WholeRowIterator.decodeRow(key, value).entrySet())
+		{
+			Tuple tuple = iter.next();
+			
+			assertTrue(Arrays.equals(e.getKey().getColumnFamily().getBytes(), ((DataByteArray)tuple.get(0)).get()));
+			assertTrue(Arrays.equals(e.getKey().getColumnQualifier().getBytes(), ((DataByteArray)tuple.get(1)).get()));
+			assertTrue(Arrays.equals(e.getKey().getColumnVisibility().getBytes(), ((DataByteArray)tuple.get(2)).get()));
+			assertEquals(e.getKey().getTimestamp(), ((Long)tuple.get(3)).longValue());
+			assertTrue(Arrays.equals(e.getValue().get(), ((DataByteArray)tuple.get(4)).get()));
+		}
+	}
+}