You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/10/19 01:53:18 UTC

[11/13] git commit: ACCUMULO-1783 Update accumulo dependency to 1.5.0 and reformat code

ACCUMULO-1783 Update accumulo dependency to 1.5.0 and reformat code


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/6d494666
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/6d494666
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/6d494666

Branch: refs/heads/1.5
Commit: 6d4946661b722205b824bd72e174cdc10ecdd165
Parents: 9d9c5fa
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 17 15:27:59 2013 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Oct 17 15:27:59 2013 -0400

----------------------------------------------------------------------
 pom.xml                                         | 101 ++--
 .../accumulo/pig/AbstractAccumuloStorage.java   | 472 +++++++++----------
 .../apache/accumulo/pig/AccumuloStorage.java    | 107 ++---
 .../accumulo/pig/AccumuloWholeRowStorage.java   | 122 +++--
 .../java/org/apache/accumulo/pig/Utils.java     |  68 ++-
 .../pig/AbstractAccumuloStorageTest.java        | 229 +++++----
 .../accumulo/pig/AccumuloStorageTest.java       | 139 +++---
 .../pig/AccumuloWholeRowStorageTest.java        | 218 ++++-----
 .../java/org/apache/accumulo/pig/TestUtils.java |  88 ++--
 9 files changed, 718 insertions(+), 826 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0b36bbf..097d8de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,20 +1,14 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+  license agreements. See the NOTICE file distributed with this work for additional 
+  information regarding copyright ownership. The ASF licenses this file to 
+  You under the Apache License, Version 2.0 (the "License"); you may not use 
+  this file except in compliance with the License. You may obtain a copy of 
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+  by applicable law or agreed to in writing, software distributed under the 
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+  OF ANY KIND, either express or implied. See the License for the specific 
+  language governing permissions and limitations under the License. -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
@@ -32,62 +26,49 @@
       <artifactId>log4j</artifactId>
       <version>1.2.16</version>
     </dependency>
-
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
       <version>1.1.1</version>
     </dependency>
-
     <dependency>
       <groupId>org.apache.pig</groupId>
       <artifactId>pig</artifactId>
       <version>0.11.0</version>
     </dependency>
-
     <!-- Needed by Apache Pig 0.11.0 -->
     <dependency>
-        <groupId>joda-time</groupId>
-        <artifactId>joda-time</artifactId>
-        <version>1.6</version>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>1.6</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+      <version>1.5.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.3.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.9.0</version>
     </dependency>
-
-	<dependency>
-        <groupId>org.apache.accumulo</groupId>
-        <artifactId>accumulo-core</artifactId>
-    <version>1.5.0-SNAPSHOT</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>zookeeper</artifactId>
-        <version>3.3.1</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.thrift</groupId>
-        <artifactId>libthrift</artifactId>
-        <version>0.9.0</version>
-      </dependency>
-<!--
-      <dependency>
-        <groupId>org.apache.accumulo</groupId>
-        <artifactId>cloudtrace</artifactId>
-    <version>1.4.0</version>
-      </dependency> 
--->
-
-
   </dependencies>
 
-<build>
- <plugins>
-<plugin>
-   <groupId>org.apache.maven.plugins</groupId>
-   <artifactId>maven-compiler-plugin</artifactId>
-   <configuration>
-      <source>1.6</source>
-      <target>1.6</target>
-   </configuration>
-</plugin>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
 
       <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
@@ -97,7 +78,7 @@
           </descriptorRefs>
           <archive>
             <manifest>
-              <mainClass/>
+              <mainClass />
             </manifest>
           </archive>
         </configuration>
@@ -113,8 +94,8 @@
       </plugin>
 
 
- </plugins>
-</build>
+    </plugins>
+  </build>
 
 
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 82329d3..9612d15 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -54,270 +54,236 @@ import org.apache.pig.data.Tuple;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
  * 
- * Tuples can be written in 2 forms:
- *  (key, colfam, colqual, colvis, value)
- *    OR
- *  (key, colfam, colqual, value)
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ * 
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
  * 
  */
-public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface
-{
-    private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
-
-    private Configuration conf;
-    private RecordReader<Key, Value> reader;
-    private RecordWriter<Text, Mutation> writer;
-    
-    String inst;
-    String zookeepers;
-    String user;
-    String password;
-    String table;
-    Text tableName;
-    String auths;
-    Authorizations authorizations;
-    List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-    
-    String start = null;
-    String end = null;
-    
-    int maxWriteThreads = 10;
-    long maxMutationBufferSize = 10*1000*1000;
-    int maxLatency = 10*1000;
-
-    public AbstractAccumuloStorage(){}
-
-	@Override
-    public Tuple getNext() throws IOException
-    {
-        try
-        {
-            // load the next pair
-            if (!reader.nextKeyValue())
-                return null;
-            
-            Key key = (Key)reader.getCurrentKey();
-            Value value = (Value)reader.getCurrentValue();
-            assert key != null && value != null;
-            return getTuple(key, value);
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-    }
-	
-	protected abstract Tuple getTuple(Key key, Value value) throws IOException;
-	
-
-    @Override
-    public InputFormat getInputFormat()
-    {
-        return new AccumuloInputFormat();
+public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface {
+  private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
+  
+  private Configuration conf;
+  private RecordReader<Key,Value> reader;
+  private RecordWriter<Text,Mutation> writer;
+  
+  String inst;
+  String zookeepers;
+  String user;
+  String password;
+  String table;
+  Text tableName;
+  String auths;
+  Authorizations authorizations;
+  List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
+  
+  String start = null;
+  String end = null;
+  
+  int maxWriteThreads = 10;
+  long maxMutationBufferSize = 10 * 1000 * 1000;
+  int maxLatency = 10 * 1000;
+  
+  public AbstractAccumuloStorage() {}
+  
+  @Override
+  public Tuple getNext() throws IOException {
+    try {
+      // load the next pair
+      if (!reader.nextKeyValue())
+        return null;
+      
+      Key key = (Key) reader.getCurrentKey();
+      Value value = (Value) reader.getCurrentValue();
+      assert key != null && value != null;
+      return getTuple(key, value);
+    } catch (InterruptedException e) {
+      throw new IOException(e.getMessage());
     }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    {
-        this.reader = reader;
-    }
-
-    private void setLocationFromUri(String location) throws IOException
-    {
-        // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
-        String columns = "";
-        try
-        {
-            if (!location.startsWith("accumulo://"))
-                throw new Exception("Bad scheme.");
-            String[] urlParts = location.split("\\?");
-            if (urlParts.length > 1)
-            {
-                for (String param : urlParts[1].split("&"))
-                {
-                    String[] pair = param.split("=");
-                    if (pair[0].equals("instance"))
-                        inst = pair[1];
-                    else if (pair[0].equals("user"))
-                        user = pair[1];
-                    else if (pair[0].equals("password"))
-                        password = pair[1];
-                    else if (pair[0].equals("zookeepers"))
-                    	zookeepers = pair[1];
-                    else if (pair[0].equals("auths"))
-                    	auths = pair[1];
-                    else if (pair[0].equals("columns"))
-                    	columns = pair[1];
-                    else if (pair[0].equals("start"))
-                    	start = pair[1];
-                    else if (pair[0].equals("end"))
-                    	end = pair[1];
-                    else if (pair[0].equals("write_buffer_size_bytes"))
-                    	maxMutationBufferSize = Long.parseLong(pair[1]);
-                    else if (pair[0].equals("write_threads"))
-                    	maxWriteThreads = Integer.parseInt(pair[1]);
-                    else if (pair[0].equals("write_latency_ms"))
-                    	maxLatency = Integer.parseInt(pair[1]);
-                }
-            }
-            String[] parts = urlParts[0].split("/+");
-            table = parts[1];
-            tableName = new Text(table);
-            
-            if(auths == null || auths.equals(""))
-            {
-            	authorizations = new Authorizations();
-            }
-            else
-            {
-            	authorizations = new Authorizations(auths.split(","));
-            }
-            
-            if(!columns.equals("")){
-            	for(String cfCq : columns.split(","))
-            	{
-            		if(cfCq.contains("|"))
-            		{
-            			String[] c = cfCq.split("\\|");
-            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(c[0]), new Text(c[1])));
-            		}
-            		else
-            		{
-            			columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text(cfCq), null));
-            		}
-            	}
-            }
-            	
+  }
+  
+  protected abstract Tuple getTuple(Key key, Value value) throws IOException;
+  
+  @Override
+  public InputFormat getInputFormat() {
+    return new AccumuloInputFormat();
+  }
+  
+  @Override
+  public void prepareToRead(RecordReader reader, PigSplit split) {
+    this.reader = reader;
+  }
+  
+  private void setLocationFromUri(String location) throws IOException {
+    // ex:
+    // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
+    String columns = "";
+    try {
+      if (!location.startsWith("accumulo://"))
+        throw new Exception("Bad scheme.");
+      String[] urlParts = location.split("\\?");
+      if (urlParts.length > 1) {
+        for (String param : urlParts[1].split("&")) {
+          String[] pair = param.split("=");
+          if (pair[0].equals("instance"))
+            inst = pair[1];
+          else if (pair[0].equals("user"))
+            user = pair[1];
+          else if (pair[0].equals("password"))
+            password = pair[1];
+          else if (pair[0].equals("zookeepers"))
+            zookeepers = pair[1];
+          else if (pair[0].equals("auths"))
+            auths = pair[1];
+          else if (pair[0].equals("columns"))
+            columns = pair[1];
+          else if (pair[0].equals("start"))
+            start = pair[1];
+          else if (pair[0].equals("end"))
+            end = pair[1];
+          else if (pair[0].equals("write_buffer_size_bytes"))
+            maxMutationBufferSize = Long.parseLong(pair[1]);
+          else if (pair[0].equals("write_threads"))
+            maxWriteThreads = Integer.parseInt(pair[1]);
+          else if (pair[0].equals("write_latency_ms"))
+            maxLatency = Integer.parseInt(pair[1]);
         }
-        catch (Exception e)
-        {
-            throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&" +
-            						"[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': " + e.getMessage());
+      }
+      String[] parts = urlParts[0].split("/+");
+      table = parts[1];
+      tableName = new Text(table);
+      
+      if (auths == null || auths.equals("")) {
+        authorizations = new Authorizations();
+      } else {
+        authorizations = new Authorizations(auths.split(","));
+      }
+      
+      if (!columns.equals("")) {
+        for (String cfCq : columns.split(",")) {
+          if (cfCq.contains("|")) {
+            String[] c = cfCq.split("\\|");
+            columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(c[0]), new Text(c[1])));
+          } else {
+            columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(cfCq), null));
+          }
         }
+      }
+      
+    } catch (Exception e) {
+      throw new IOException(
+          "Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
+              + "[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]': "
+              + e.getMessage());
     }
+  }
+  
+  protected RecordWriter<Text,Mutation> getWriter() {
+    return writer;
+  }
+  
+  @Override
+  public void setLocation(String location, Job job) throws IOException {
+    conf = job.getConfiguration();
+    setLocationFromUri(location);
     
-    protected RecordWriter<Text, Mutation> getWriter() {
-		return writer;
-	}
-    
-    @Override
-    public void setLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-        
-        if(!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf))
-        {
-        	AccumuloInputFormat.setInputTableName(job, table);
-        	AccumuloInputFormat.setScanAuthorizations(job, authorizations);
-            AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
-            
-        	try {
-				AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
-			} catch (AccumuloSecurityException e) {
-				throw new IOException(e);
-			}
-        	
-            if(columnFamilyColumnQualifierPairs.size() > 0)
-            {
-            	LOG.info("columns: "+columnFamilyColumnQualifierPairs);
-            	AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
-            }
-            
-            AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
-            configureInputFormat(conf);
-        }
+    if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+      AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+      
+      try {
+        AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+      } catch (AccumuloSecurityException e) {
+        throw new IOException(e);
+      }
+      
+      if (columnFamilyColumnQualifierPairs.size() > 0) {
+        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+        AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
+      }
+      
+      AccumuloInputFormat.setRanges(job, Collections.singleton(new Range(start, end)));
+      configureInputFormat(conf);
     }
-
-    protected void configureInputFormat(Configuration conf){}
-    
-    protected void configureOutputFormat(Configuration conf){}
+  }
+  
+  protected void configureInputFormat(Configuration conf) {}
+  
+  protected void configureOutputFormat(Configuration conf) {}
+  
+  @Override
+  public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+    return location;
+  }
+  
+  @Override
+  public void setUDFContextSignature(String signature) {}
+  
+  /* StoreFunc methods */
+  public void setStoreFuncUDFContextSignature(String signature) {}
+  
+  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+    return relativeToAbsolutePath(location, curDir);
+  }
+  
+  public void setStoreLocation(String location, Job job) throws IOException {
+    conf = job.getConfiguration();
+    setLocationFromUri(location);
     
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
-    {
-        return location;
-    }
-
-    @Override
-    public void setUDFContextSignature(String signature){}
-
-    /* StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature){}
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-    {
-        return relativeToAbsolutePath(location, curDir);
-    }
-    
-    public void setStoreLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-
-        try
-        { 
-           if(!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf))
-           {
-        	   BatchWriterConfig bwConfig = new BatchWriterConfig();
-        	   bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
-        	   bwConfig.setMaxMemory(maxMutationBufferSize);
-        	   bwConfig.setMaxWriteThreads(maxWriteThreads);
-        	   AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
-        	   
-        	   AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
-        	   AccumuloOutputFormat.setDefaultTableName(job, table);
-        	   AccumuloOutputFormat.setCreateTables(job, true);
-        	   
-				try {
-					AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
-				} catch (AccumuloSecurityException e) {
-					throw new IOException(e);
-				}
-
-                configureOutputFormat(conf);
-           }
-        }
-        catch(java.lang.IllegalStateException e1){
-            e1.printStackTrace();
+    try {
+      if (!ConfiguratorBase.isConnectorInfoSet(AccumuloOutputFormat.class, conf)) {
+        BatchWriterConfig bwConfig = new BatchWriterConfig();
+        bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+        bwConfig.setMaxMemory(maxMutationBufferSize);
+        bwConfig.setMaxWriteThreads(maxWriteThreads);
+        AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+        
+        AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+        AccumuloOutputFormat.setDefaultTableName(job, table);
+        AccumuloOutputFormat.setCreateTables(job, true);
+        
+        try {
+          AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
+        } catch (AccumuloSecurityException e) {
+          throw new IOException(e);
         }
+        
+        configureOutputFormat(conf);
+      }
+    } catch (java.lang.IllegalStateException e1) {
+      e1.printStackTrace();
     }
-
-    public OutputFormat getOutputFormat()
-    {
-        return new AccumuloOutputFormat();
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException
-    {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
-    public void prepareToWrite(RecordWriter writer)
-    {
-        this.writer = writer;
-    }
-    
-    public abstract Collection<Mutation> getMutations(Tuple tuple)throws ExecException, IOException;
-
-    public void putNext(Tuple tuple) throws ExecException, IOException
-    {
-    	Collection<Mutation> muts = getMutations(tuple);
-    	for(Mutation mut : muts)
-    	{
-    		try {
-    			getWriter().write(tableName, mut);
-    		} catch (InterruptedException e) {
-    			throw new IOException(e);
-    		}
-    	}
+  }
+  
+  public OutputFormat getOutputFormat() {
+    return new AccumuloOutputFormat();
+  }
+  
+  public void checkSchema(ResourceSchema schema) throws IOException {
+    // we don't care about types, they all get casted to ByteBuffers
+  }
+  
+  public void prepareToWrite(RecordWriter writer) {
+    this.writer = writer;
+  }
+  
+  public abstract Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException;
+  
+  public void putNext(Tuple tuple) throws ExecException, IOException {
+    Collection<Mutation> muts = getMutations(tuple);
+    for (Mutation mut : muts) {
+      try {
+        getWriter().write(tableName, mut);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
     }
-
-    public void cleanupOnFailure(String failure, Job job){}
-    
-	@Override
-	public void cleanupOnSuccess(String location, Job job) throws IOException {}
+  }
+  
+  public void cleanupOnFailure(String failure, Job job) {}
+  
+  @Override
+  public void cleanupOnSuccess(String location, Job job) throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 5b146d6..4de1618 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -34,65 +34,56 @@ import org.apache.pig.data.TupleFactory;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
  * 
- * Tuples can be written in 2 forms:
- *  (key, colfam, colqual, colvis, value)
- *    OR
- *  (key, colfam, colqual, value)
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ * 
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
  * 
  */
-public class AccumuloStorage extends AbstractAccumuloStorage
-{
-    private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
-
-    public AccumuloStorage(){}
-
-	@Override
-	protected Tuple getTuple(Key key, Value value) throws IOException {
-		// and wrap it in a tuple
-        Tuple tuple = TupleFactory.getInstance().newTuple(6);
-        tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-        tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
-        tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
-        tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
-        tuple.set(4, new Long(key.getTimestamp()));
-        tuple.set(5, new DataByteArray(value.get()));
-        return tuple;
-	}
-	
-	@Override
-	public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-		
-		try {
-			Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
-			Text cf = Utils.objToText(tuple.get(1));
-			Text cq = Utils.objToText(tuple.get(2));
-			
-			if(tuple.size() > 4)
-			{
-				Text cv = Utils.objToText(tuple.get(3));
-				Value val = new Value(Utils.objToBytes(tuple.get(4)));
-				if(cv.getLength() == 0)
-				{
-					mut.put(cf, cq, val);
-				}
-				else
-				{
-					mut.put(cf, cq, new ColumnVisibility(cv), val);
-				}
-			}
-			else
-			{
-				Value val = new Value(Utils.objToBytes(tuple.get(3)));
-				mut.put(cf, cq, val);
-			}
-			
-			return Collections.singleton(mut);
-		} catch (IOException e) {
-		System.err.println("Error on Tuple: "+tuple);
-		throw e;
-	}
-	}
+public class AccumuloStorage extends AbstractAccumuloStorage {
+  private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
+  
+  public AccumuloStorage() {}
+  
+  @Override
+  protected Tuple getTuple(Key key, Value value) throws IOException {
+    // and wrap it in a tuple
+    Tuple tuple = TupleFactory.getInstance().newTuple(6);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+    tuple.set(4, new Long(key.getTimestamp()));
+    tuple.set(5, new DataByteArray(value.get()));
+    return tuple;
+  }
+  
+  @Override
+  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+    
+    try {
+      Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+      Text cf = Utils.objToText(tuple.get(1));
+      Text cq = Utils.objToText(tuple.get(2));
+      
+      if (tuple.size() > 4) {
+        Text cv = Utils.objToText(tuple.get(3));
+        Value val = new Value(Utils.objToBytes(tuple.get(4)));
+        if (cv.getLength() == 0) {
+          mut.put(cf, cq, val);
+        } else {
+          mut.put(cf, cq, new ColumnVisibility(cv), val);
+        }
+      } else {
+        Value val = new Value(Utils.objToBytes(tuple.get(3)));
+        mut.put(cf, cq, val);
+      }
+      
+      return Collections.singleton(mut);
+    } catch (IOException e) {
+      System.err.println("Error on Tuple: " + tuple);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index a959dda..fcfd55e 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -43,77 +43,65 @@ import org.apache.pig.data.TupleFactory;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- *
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
  * 
- * Tuples can be written in 2 forms:
- *  (key, colfam, colqual, colvis, value)
- *    OR
- *  (key, colfam, colqual, value)
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ * 
+ * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value)
  * 
  */
-public class AccumuloWholeRowStorage extends AbstractAccumuloStorage
-{
-    private static final Log LOG = LogFactory.getLog(AccumuloWholeRowStorage.class);
-
-    public AccumuloWholeRowStorage(){}
-
-	@Override
-	protected Tuple getTuple(Key key, Value value) throws IOException {
-		
-		SortedMap<Key, Value> rowKVs =  WholeRowIterator.decodeRow(key, value);
-        List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
-        for(Entry<Key, Value> e : rowKVs.entrySet())
-        {
-        	columns.add(columnToTuple(
-        			e.getKey().getColumnFamily(), 
-        			e.getKey().getColumnQualifier(), 
-        			e.getKey().getColumnVisibility(), 
-        			e.getKey().getTimestamp(), 
-        			e.getValue())
-        		);
-        }
-        
-        // and wrap it in a tuple
-        Tuple tuple = TupleFactory.getInstance().newTuple(2);
-        tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-        tuple.set(1, new DefaultDataBag(columns));
-        
-        return tuple;
-	}
-
-	private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException
-    {
-        Tuple tuple = TupleFactory.getInstance().newTuple(5);
-        tuple.set(0, new DataByteArray(colfam.getBytes()));
-        tuple.set(1, new DataByteArray(colqual.getBytes()));
-        tuple.set(2, new DataByteArray(colvis.getBytes()));
-        tuple.set(3, new Long(ts));
-        tuple.set(4, new DataByteArray(val.get()));
-        return tuple;
-    }
-	
-    protected void configureInputFormat(Configuration conf)
-    {
-    	AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
+public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
+  private static final Log LOG = LogFactory.getLog(AccumuloWholeRowStorage.class);
+  
+  public AccumuloWholeRowStorage() {}
+  
+  @Override
+  protected Tuple getTuple(Key key, Value value) throws IOException {
+    
+    SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
+    List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
+    for (Entry<Key,Value> e : rowKVs.entrySet()) {
+      columns.add(columnToTuple(e.getKey().getColumnFamily(), e.getKey().getColumnQualifier(), e.getKey().getColumnVisibility(), e.getKey().getTimestamp(),
+          e.getValue()));
     }
     
-    @Override
-    public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
-    	
-    	Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
-        DefaultDataBag columns = (DefaultDataBag)tuple.get(1);
-        for(Tuple column : columns)
-        {
-        	Text cf = Utils.objToText(column.get(0));
-        	Text cq = Utils.objToText(column.get(1));
-        	Text cv = Utils.objToText(column.get(2));
-        	Long ts = (Long)column.get(3);
-        	Value val = new Value(Utils.objToBytes(column.get(4)));
-        	
-        	mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
-        }
-    	
-    	return Collections.singleton(mut);
+    // and wrap it in a tuple
+    Tuple tuple = TupleFactory.getInstance().newTuple(2);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    tuple.set(1, new DefaultDataBag(columns));
+    
+    return tuple;
+  }
+  
+  private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val) throws IOException {
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, new DataByteArray(colfam.getBytes()));
+    tuple.set(1, new DataByteArray(colqual.getBytes()));
+    tuple.set(2, new DataByteArray(colvis.getBytes()));
+    tuple.set(3, new Long(ts));
+    tuple.set(4, new DataByteArray(val.get()));
+    return tuple;
+  }
+  
+  protected void configureInputFormat(Configuration conf) {
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
+  }
+  
+  @Override
+  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException {
+    
+    Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
+    DefaultDataBag columns = (DefaultDataBag) tuple.get(1);
+    for (Tuple column : columns) {
+      Text cf = Utils.objToText(column.get(0));
+      Text cq = Utils.objToText(column.get(1));
+      Text cv = Utils.objToText(column.get(2));
+      Long ts = (Long) column.get(3);
+      Value val = new Value(Utils.objToBytes(column.get(4)));
+      
+      mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
     }
+    
+    return Collections.singleton(mut);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/main/java/org/apache/accumulo/pig/Utils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/Utils.java b/src/main/java/org/apache/accumulo/pig/Utils.java
index 6b7fdf9..4fecd99 100644
--- a/src/main/java/org/apache/accumulo/pig/Utils.java
+++ b/src/main/java/org/apache/accumulo/pig/Utils.java
@@ -20,45 +20,37 @@ import org.apache.hadoop.io.Text;
 import org.apache.pig.data.DataByteArray;
 
 public class Utils {
-	public static Text objToText(Object o)
-    {
-    	return new Text(objToBytes(o));
+  public static Text objToText(Object o) {
+    return new Text(objToBytes(o));
+  }
+  
+  public static byte[] objToBytes(Object o) {
+    if (o == null) {
+      return new byte[0];
     }
     
-    public static byte[] objToBytes(Object o)
-    {
-    	if( o == null)
-    	{
-    		return new byte[0];
-    	}
-    	
-    	if (o instanceof String) {
-			String str = (String) o;
-			return str.getBytes();
-		}
-    	else if (o instanceof Long) {
-			Long l = (Long) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Integer) {
-    		Integer l = (Integer) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Boolean) {
-    		Boolean l = (Boolean) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Float) {
-    		Float l = (Float) o;
-			return l.toString().getBytes();
-		}
-    	else if (o instanceof Double) {
-    		Double l = (Double) o;
-			return l.toString().getBytes();
-		}
-    	
-    	// TODO: handle DataBag, Map<Object, Object>, and Tuple
-    	
-    	return ((DataByteArray)o).get();
+    if (o instanceof String) {
+      String str = (String) o;
+      return str.getBytes();
+    } else if (o instanceof Long) {
+      Long l = (Long) o;
+      return l.toString().getBytes();
+    } else if (o instanceof Integer) {
+      Integer l = (Integer) o;
+      return l.toString().getBytes();
+    } else if (o instanceof Boolean) {
+      Boolean l = (Boolean) o;
+      return l.toString().getBytes();
+    } else if (o instanceof Float) {
+      Float l = (Float) o;
+      return l.toString().getBytes();
+    } else if (o instanceof Double) {
+      Double l = (Double) o;
+      return l.toString().getBytes();
     }
+    
+    // TODO: handle DataBag, Map<Object, Object>, and Tuple
+    
+    return ((DataByteArray) o).get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 402c74a..9b9c3c7 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -36,121 +36,116 @@ import org.apache.pig.data.Tuple;
 import org.junit.Test;
 
 public class AbstractAccumuloStorageTest {
-	
-	public Job getExpectedLoadJob(String inst, String zookeepers,  String user, String password,String table, 
-			String start,String end,Authorizations authorizations, List<Pair<Text, Text>> columnFamilyColumnQualifierPairs) throws IOException
-	{
-	    Collection<Range> ranges = new LinkedList<Range>();
-	    ranges.add(new Range(start, end));
-	    
-		Job expected = new Job();
-		Configuration expectedConf = expected.getConfiguration();
-		AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
-        AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-        AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
-        AccumuloInputFormat.setRanges(expectedConf, ranges);
-        return expected;
-	}
-	
-	public Job getDefaultExpectedLoadJob() throws IOException
-	{
-		String inst = "myinstance";
-	    String zookeepers = "127.0.0.1:2181";
-	    String user = "root";
-	    String password = "secret";
-	    String table = "table1";
-	    String start = "abc";
-	    String end = "z";
-	    Authorizations authorizations = new Authorizations("PRIVATE,PUBLIC".split(","));
-	    
-	    List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
-	    columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col1"), new Text("cq1")));
-	    columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col2"), new Text("cq2")));
-	    columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>(new Text("col3"), null));
-	    
-		Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
-		return expected;
-	}
-	
-	public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password,String table, long maxWriteBufferSize, int writeThreads, int maxWriteLatencyMS) throws IOException
-	{
-		Job expected = new Job();
-		Configuration expectedConf = expected.getConfiguration();
-		AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
-        AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-        AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
-        AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
-        AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
-        
-        return expected;
-	}
-	
-	public Job getDefaultExpectedStoreJob() throws IOException
-	{
-		String inst = "myinstance";
-	    String zookeepers = "127.0.0.1:2181";
-	    String user = "root";
-	    String password = "secret";
-	    String table = "table1";
-	    long maxWriteBufferSize = 1234000;
-	    int writeThreads = 7;
-	    int maxWriteLatencyMS = 30000;
-	    
-		Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
-		return expected;
-	}
-	
-	public String getDefaultLoadLocation()
-	{
-		return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
-	}
-	
-	public String getDefaultStoreLocation()
-	{
-		return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&write_buffer_size_bytes=1234000&write_threads=7&write_latency_ms=30000";
-	}
-	
-	public AbstractAccumuloStorage getAbstractAccumuloStorage()
-	{
-		AbstractAccumuloStorage s = new AbstractAccumuloStorage() {
-			
-			@Override
-			public Collection<Mutation> getMutations(Tuple tuple) {return null;}
-
-			@Override
-			protected Tuple getTuple(Key key, Value value) throws IOException {return null;}
-		};
-		return s;
-	}
-	
-	
-	@Test
-	public void testSetLoadLocation() throws IOException
-	{
-		AbstractAccumuloStorage s = getAbstractAccumuloStorage();
-		
-		Job actual = new Job();
-		s.setLocation(getDefaultLoadLocation(), actual);
-		Configuration actualConf = actual.getConfiguration();
-		
-		Job expected = getDefaultExpectedLoadJob();
-		Configuration expectedConf = expected.getConfiguration();
-		
-		TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
-	}
-	
-	@Test
-	public void testSetStoreLocation() throws IOException
-	{
-		AbstractAccumuloStorage s = getAbstractAccumuloStorage();
-		
-		Job actual = new Job();
-		s.setStoreLocation(getDefaultStoreLocation(), actual);
-		Configuration actualConf = actual.getConfiguration();
-		
-		Job expected = getDefaultExpectedStoreJob();
-		Configuration expectedConf = expected.getConfiguration();
-		
-		TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
-	}
+  
+  public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password, String table, String start, String end,
+      Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs) throws IOException {
+    Collection<Range> ranges = new LinkedList<Range>();
+    ranges.add(new Range(start, end));
+    
+    Job expected = new Job();
+    Configuration expectedConf = expected.getConfiguration();
+    AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+    AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
+    AccumuloInputFormat.setRanges(expectedConf, ranges);
+    return expected;
+  }
+  
+  public Job getDefaultExpectedLoadJob() throws IOException {
+    String inst = "myinstance";
+    String zookeepers = "127.0.0.1:2181";
+    String user = "root";
+    String password = "secret";
+    String table = "table1";
+    String start = "abc";
+    String end = "z";
+    Authorizations authorizations = new Authorizations("PRIVATE,PUBLIC".split(","));
+    
+    List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>();
+    columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col1"), new Text("cq1")));
+    columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new Text("cq2")));
+    columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
+    
+    Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end, authorizations, columnFamilyColumnQualifierPairs);
+    return expected;
+  }
+  
+  public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password, String table, long maxWriteBufferSize, int writeThreads,
+      int maxWriteLatencyMS) throws IOException {
+    Job expected = new Job();
+    Configuration expectedConf = expected.getConfiguration();
+    AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
+    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+    AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
+    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
+    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+    
+    return expected;
+  }
+  
+  public Job getDefaultExpectedStoreJob() throws IOException {
+    String inst = "myinstance";
+    String zookeepers = "127.0.0.1:2181";
+    String user = "root";
+    String password = "secret";
+    String table = "table1";
+    long maxWriteBufferSize = 1234000;
+    int writeThreads = 7;
+    int maxWriteLatencyMS = 30000;
+    
+    Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+    return expected;
+  }
+  
+  public String getDefaultLoadLocation() {
+    return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
+  }
+  
+  public String getDefaultStoreLocation() {
+    return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&write_buffer_size_bytes=1234000&write_threads=7&write_latency_ms=30000";
+  }
+  
+  public AbstractAccumuloStorage getAbstractAccumuloStorage() {
+    AbstractAccumuloStorage s = new AbstractAccumuloStorage() {
+      
+      @Override
+      public Collection<Mutation> getMutations(Tuple tuple) {
+        return null;
+      }
+      
+      @Override
+      protected Tuple getTuple(Key key, Value value) throws IOException {
+        return null;
+      }
+    };
+    return s;
+  }
+  
+  @Test
+  public void testSetLoadLocation() throws IOException {
+    AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+    
+    Job actual = new Job();
+    s.setLocation(getDefaultLoadLocation(), actual);
+    Configuration actualConf = actual.getConfiguration();
+    
+    Job expected = getDefaultExpectedLoadJob();
+    Configuration expectedConf = expected.getConfiguration();
+    
+    TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+  }
+  
+  @Test
+  public void testSetStoreLocation() throws IOException {
+    AbstractAccumuloStorage s = getAbstractAccumuloStorage();
+    
+    Job actual = new Job();
+    s.setStoreLocation(getDefaultStoreLocation(), actual);
+    Configuration actualConf = actual.getConfiguration();
+    
+    Job expected = getDefaultExpectedStoreJob();
+    Configuration expectedConf = expected.getConfiguration();
+    
+    TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index dd45a1a..fbd68c6 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -33,75 +33,72 @@ import org.apache.pig.data.TupleFactory;
 import org.junit.Test;
 
 public class AccumuloStorageTest {
-	
-	@Test
-	public void testGetMutations4() throws Exception
-	{
-		AccumuloStorage s = new AccumuloStorage();
-		
-		Tuple tuple = TupleFactory.getInstance().newTuple(4);
-		tuple.set(0, "row1");
-		tuple.set(1, "cf1");
-		tuple.set(2, "cq1");
-		tuple.set(3, "val1");
-		
-		Collection<Mutation> muts = s.getMutations(tuple);
-		
-		assertNotNull(muts);
-		assertEquals(1, muts.size());
-		Mutation mut = muts.iterator().next();
-		List<ColumnUpdate> updates = mut.getUpdates();
-		assertEquals(1, updates.size());
-		ColumnUpdate update = updates.get(0);
-		
-		assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
-		assertTrue(Arrays.equals(((String)tuple.get(1)).getBytes(), update.getColumnFamily()));
-		assertTrue(Arrays.equals(((String)tuple.get(2)).getBytes(), update.getColumnQualifier()));
-		assertTrue(Arrays.equals(((String)tuple.get(3)).getBytes(), update.getValue()));
-		assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
-	}
-	
-	@Test
-	public void testGetMutations5() throws Exception
-	{
-		AccumuloStorage s = new AccumuloStorage();
-		
-		Tuple tuple = TupleFactory.getInstance().newTuple(5);
-		tuple.set(0, "row1");
-		tuple.set(1, "cf1");
-		tuple.set(2, "cq1");
-		tuple.set(3, "cv1");
-		tuple.set(4, "val1");
-		
-		Collection<Mutation> muts = s.getMutations(tuple);
-		
-		assertNotNull(muts);
-		assertEquals(1, muts.size());
-		Mutation mut = muts.iterator().next();
-		List<ColumnUpdate> updates = mut.getUpdates();
-		assertEquals(1, updates.size());
-		ColumnUpdate update = updates.get(0);
-		
-		assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
-		assertTrue(Arrays.equals(((String)tuple.get(1)).getBytes(), update.getColumnFamily()));
-		assertTrue(Arrays.equals(((String)tuple.get(2)).getBytes(), update.getColumnQualifier()));
-		assertTrue(Arrays.equals(((String)tuple.get(3)).getBytes(), update.getColumnVisibility()));
-		assertTrue(Arrays.equals(((String)tuple.get(4)).getBytes(), update.getValue()));
-	}
-	
-	@Test
-	public void testGetTuple() throws Exception
-	{
-		AccumuloStorage s = new AccumuloStorage();
-		
-		Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
-		Value value = new Value("val1".getBytes());
-		Tuple tuple  = s.getTuple(key, value);
-		TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-		
-		key = new Key("row1", "cf1", "cq1");
-		value = new Value("val1".getBytes());
-		tuple  = s.getTuple(key, value);
-		TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-	}
+  
+  @Test
+  public void testGetMutations4() throws Exception {
+    AccumuloStorage s = new AccumuloStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(4);
+    tuple.set(0, "row1");
+    tuple.set(1, "cf1");
+    tuple.set(2, "cq1");
+    tuple.set(3, "val1");
+    
+    Collection<Mutation> muts = s.getMutations(tuple);
+    
+    assertNotNull(muts);
+    assertEquals(1, muts.size());
+    Mutation mut = muts.iterator().next();
+    List<ColumnUpdate> updates = mut.getUpdates();
+    assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue()));
+    assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
+  }
+  
+  @Test
+  public void testGetMutations5() throws Exception {
+    AccumuloStorage s = new AccumuloStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, "row1");
+    tuple.set(1, "cf1");
+    tuple.set(2, "cq1");
+    tuple.set(3, "cv1");
+    tuple.set(4, "val1");
+    
+    Collection<Mutation> muts = s.getMutations(tuple);
+    
+    assertNotNull(muts);
+    assertEquals(1, muts.size());
+    Mutation mut = muts.iterator().next();
+    List<ColumnUpdate> updates = mut.getUpdates();
+    assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility()));
+    assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue()));
+  }
+  
+  @Test
+  public void testGetTuple() throws Exception {
+    AccumuloStorage s = new AccumuloStorage();
+    
+    Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
+    Value value = new Value("val1".getBytes());
+    Tuple tuple = s.getTuple(key, value);
+    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+    
+    key = new Key("row1", "cf1", "cq1");
+    value = new Value("val1".getBytes());
+    tuple = s.getTuple(key, value);
+    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 750ebea..f8e8fe1 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -45,120 +45,108 @@ import org.apache.pig.data.TupleFactory;
 import org.junit.Test;
 
 public class AccumuloWholeRowStorageTest {
-	
-	@Test
-	public void testConfiguration() throws IOException
-	{
-		AbstractAccumuloStorageTest test = new AbstractAccumuloStorageTest();
-		
-		AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
-		
-		Job actual = new Job();
-		s.setLocation(test.getDefaultLoadLocation(), actual);
-		Configuration actualConf = actual.getConfiguration();
-		
-		Job expected =  test.getDefaultExpectedLoadJob();
-		Configuration expectedConf = expected.getConfiguration();
-		AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
-		
-		TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
-	}
-	
-	public static Tuple generateTuple(String cf, String cq, String cv, Long ts, String val) throws ExecException
-	{
-		Tuple tuple = TupleFactory.getInstance().newTuple(5);
-		tuple.set(0, new DataByteArray(cf.getBytes()));
-		tuple.set(1, new DataByteArray(cq.getBytes()));
-		tuple.set(2, new DataByteArray(cv.getBytes()));
-		tuple.set(3, ts);
-		tuple.set(4, new DataByteArray(val.getBytes()));
-		return tuple;
-	}
-	
-	@Test
-	public void testGetMutations() throws Exception
-	{
-		Tuple tuple = TupleFactory.getInstance().newTuple(2);
-		tuple.set(0, "row1");
-		
-		DefaultDataBag bag = new DefaultDataBag();
-		bag.add(generateTuple("cf1", "cq1", "cv1", 1L, "val1"));
-		bag.add(generateTuple("cf2", "cq2", "cv2", 2L, "val2"));
-		bag.add(generateTuple("cf3", "cq3", "cv3", 3L, "val3"));
-		tuple.set(1, bag);
-		
-		AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
-		Collection<Mutation> muts = s.getMutations(tuple);
-		
-		assertNotNull(muts);
-		assertEquals(1, muts.size());
-		Mutation mut = muts.iterator().next();
-		
-		List<ColumnUpdate> updates = mut.getUpdates();
-		assertEquals(3, updates.size());
-		
-		assertTrue(Arrays.equals(((String)tuple.get(0)).getBytes(), mut.getRow()));
-		
-		Iterator<Tuple> iter = bag.iterator();
-		for(ColumnUpdate update : updates)
-		{
-			Tuple colTuple = iter.next();
-			
-			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(0)).get(), update.getColumnFamily()));
-			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(1)).get(), update.getColumnQualifier()));
-			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(2)).get(), update.getColumnVisibility()));
-			assertEquals(((Long)colTuple.get(3)).longValue(), update.getTimestamp());
-			assertTrue(Arrays.equals(((DataByteArray)colTuple.get(4)).get(), update.getValue()));
-		}
-	}
-	
-	@Test
-	public void testGetTuple() throws Exception
-	{
-		AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
-		
-		Key key = new Key("row");
-		
-		List<Key> keys = new ArrayList<Key>(3);
-		keys.add(new Key("row", "cf1", "cf1", "cv1", 1L));
-		keys.add(new Key("row", "cf2", "cf2", "cv2", 2L));
-		keys.add(new Key("row", "cf3", "cf3", "cv3", 3L));
-		
-		List<Value> values = new ArrayList<Value>(3);
-		values.add(new Value("1".getBytes()));
-		values.add(new Value("2".getBytes()));
-		values.add(new Value("3".getBytes()));
-		
-		Value value = WholeRowIterator.encodeRow(keys, values);
-		
-		List<Tuple> columns = new LinkedList<Tuple>();
-		for(int i = 0; i < keys.size(); ++i)
-		{
-			columns.add(columnToTuple(
-							keys.get(i).getColumnFamily().toString(), 
-							keys.get(i).getColumnQualifier().toString(), 
-							keys.get(i).getColumnVisibility().toString(), 
-							keys.get(i).getTimestamp(), 
-							new String(values.get(i).get())));
-		}
-		
-		Tuple tuple = TupleFactory.getInstance().newTuple(2);
-        tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-        tuple.set(1, new DefaultDataBag(columns));
-        
-		TestUtils.assertWholeRowKeyValueEqualsTuple(key, value, tuple);
-	}
-	
-	private Tuple columnToTuple(String colfam, String colqual, String colvis, long ts, String val) throws IOException
-    {
-        Tuple tuple = TupleFactory.getInstance().newTuple(5);
-        tuple.set(0, new DataByteArray(colfam.getBytes()));
-        tuple.set(1, new DataByteArray(colqual.getBytes()));
-        tuple.set(2, new DataByteArray(colvis.getBytes()));
-        tuple.set(3, new Long(ts));
-        tuple.set(4, new DataByteArray(val.getBytes()));
-        return tuple;
+  
+  @Test
+  public void testConfiguration() throws IOException {
+    AbstractAccumuloStorageTest test = new AbstractAccumuloStorageTest();
+    
+    AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+    
+    Job actual = new Job();
+    s.setLocation(test.getDefaultLoadLocation(), actual);
+    Configuration actualConf = actual.getConfiguration();
+    
+    Job expected = test.getDefaultExpectedLoadJob();
+    Configuration expectedConf = expected.getConfiguration();
+    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
+    
+    TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
+  }
+  
+  public static Tuple generateTuple(String cf, String cq, String cv, Long ts, String val) throws ExecException {
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, new DataByteArray(cf.getBytes()));
+    tuple.set(1, new DataByteArray(cq.getBytes()));
+    tuple.set(2, new DataByteArray(cv.getBytes()));
+    tuple.set(3, ts);
+    tuple.set(4, new DataByteArray(val.getBytes()));
+    return tuple;
+  }
+  
+  @Test
+  public void testGetMutations() throws Exception {
+    Tuple tuple = TupleFactory.getInstance().newTuple(2);
+    tuple.set(0, "row1");
+    
+    DefaultDataBag bag = new DefaultDataBag();
+    bag.add(generateTuple("cf1", "cq1", "cv1", 1L, "val1"));
+    bag.add(generateTuple("cf2", "cq2", "cv2", 2L, "val2"));
+    bag.add(generateTuple("cf3", "cq3", "cv3", 3L, "val3"));
+    tuple.set(1, bag);
+    
+    AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+    Collection<Mutation> muts = s.getMutations(tuple);
+    
+    assertNotNull(muts);
+    assertEquals(1, muts.size());
+    Mutation mut = muts.iterator().next();
+    
+    List<ColumnUpdate> updates = mut.getUpdates();
+    assertEquals(3, updates.size());
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow()));
+    
+    Iterator<Tuple> iter = bag.iterator();
+    for (ColumnUpdate update : updates) {
+      Tuple colTuple = iter.next();
+      
+      assertTrue(Arrays.equals(((DataByteArray) colTuple.get(0)).get(), update.getColumnFamily()));
+      assertTrue(Arrays.equals(((DataByteArray) colTuple.get(1)).get(), update.getColumnQualifier()));
+      assertTrue(Arrays.equals(((DataByteArray) colTuple.get(2)).get(), update.getColumnVisibility()));
+      assertEquals(((Long) colTuple.get(3)).longValue(), update.getTimestamp());
+      assertTrue(Arrays.equals(((DataByteArray) colTuple.get(4)).get(), update.getValue()));
     }
-	
-	
+  }
+  
+  @Test
+  public void testGetTuple() throws Exception {
+    AccumuloWholeRowStorage s = new AccumuloWholeRowStorage();
+    
+    Key key = new Key("row");
+    
+    List<Key> keys = new ArrayList<Key>(3);
+    keys.add(new Key("row", "cf1", "cf1", "cv1", 1L));
+    keys.add(new Key("row", "cf2", "cf2", "cv2", 2L));
+    keys.add(new Key("row", "cf3", "cf3", "cv3", 3L));
+    
+    List<Value> values = new ArrayList<Value>(3);
+    values.add(new Value("1".getBytes()));
+    values.add(new Value("2".getBytes()));
+    values.add(new Value("3".getBytes()));
+    
+    Value value = WholeRowIterator.encodeRow(keys, values);
+    
+    List<Tuple> columns = new LinkedList<Tuple>();
+    for (int i = 0; i < keys.size(); ++i) {
+      columns.add(columnToTuple(keys.get(i).getColumnFamily().toString(), keys.get(i).getColumnQualifier().toString(), keys.get(i).getColumnVisibility()
+          .toString(), keys.get(i).getTimestamp(), new String(values.get(i).get())));
+    }
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(2);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    tuple.set(1, new DefaultDataBag(columns));
+    
+    TestUtils.assertWholeRowKeyValueEqualsTuple(key, value, tuple);
+  }
+  
+  private Tuple columnToTuple(String colfam, String colqual, String colvis, long ts, String val) throws IOException {
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, new DataByteArray(colfam.getBytes()));
+    tuple.set(1, new DataByteArray(colqual.getBytes()));
+    tuple.set(2, new DataByteArray(colvis.getBytes()));
+    tuple.set(3, new Long(ts));
+    tuple.set(4, new DataByteArray(val.getBytes()));
+    return tuple;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/6d494666/src/test/java/org/apache/accumulo/pig/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/TestUtils.java b/src/test/java/org/apache/accumulo/pig/TestUtils.java
index 5a9019b..6c8bebf 100644
--- a/src/test/java/org/apache/accumulo/pig/TestUtils.java
+++ b/src/test/java/org/apache/accumulo/pig/TestUtils.java
@@ -34,51 +34,45 @@ import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
 
 public class TestUtils {
-	public static void assertConfigurationsEqual(Configuration expectedConf, Configuration actualConf)
-	{
-		// Basically, for all the keys in expectedConf, make sure the values in both confs are equal
-        Iterator<Entry<String, String>> expectedIter = expectedConf.iterator();
-        while(expectedIter.hasNext())
-        {
-        	Entry<String, String> e = expectedIter.next();
-        	assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
-        }
-        
-        // Basically, for all the keys in actualConf, make sure the values in both confs are equal
-        Iterator<Entry<String, String>> actualIter = actualConf.iterator();
-        while(actualIter.hasNext())
-        {
-        	Entry<String, String> e = actualIter.next();
-        	assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
-        }
-	}
-	
-	public static void assertKeyValueEqualsTuple(Key key, Value value, Tuple tuple) throws ExecException
-	{
-		assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray)tuple.get(0)).get()));
-		assertTrue(Arrays.equals(key.getColumnFamily().getBytes(), ((DataByteArray)tuple.get(1)).get()));
-		assertTrue(Arrays.equals(key.getColumnQualifier().getBytes(), ((DataByteArray)tuple.get(2)).get()));
-		assertTrue(Arrays.equals(key.getColumnVisibility().getBytes(), ((DataByteArray)tuple.get(3)).get()));
-		assertEquals(key.getTimestamp(), ((Long)tuple.get(4)).longValue());
-		assertTrue(Arrays.equals(value.get(), ((DataByteArray)tuple.get(5)).get()));
-	}
-	
-	public static void assertWholeRowKeyValueEqualsTuple(Key key, Value value, Tuple mainTuple) throws IOException
-	{
-		assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray)mainTuple.get(0)).get()));
-		
-		DefaultDataBag bag = (DefaultDataBag)mainTuple.get(1);
-		Iterator<Tuple> iter = bag.iterator();		
-		
-		for(Entry<Key, Value> e : WholeRowIterator.decodeRow(key, value).entrySet())
-		{
-			Tuple tuple = iter.next();
-			
-			assertTrue(Arrays.equals(e.getKey().getColumnFamily().getBytes(), ((DataByteArray)tuple.get(0)).get()));
-			assertTrue(Arrays.equals(e.getKey().getColumnQualifier().getBytes(), ((DataByteArray)tuple.get(1)).get()));
-			assertTrue(Arrays.equals(e.getKey().getColumnVisibility().getBytes(), ((DataByteArray)tuple.get(2)).get()));
-			assertEquals(e.getKey().getTimestamp(), ((Long)tuple.get(3)).longValue());
-			assertTrue(Arrays.equals(e.getValue().get(), ((DataByteArray)tuple.get(4)).get()));
-		}
-	}
+  public static void assertConfigurationsEqual(Configuration expectedConf, Configuration actualConf) {
+    // Basically, for all the keys in expectedConf, make sure the values in both confs are equal
+    Iterator<Entry<String,String>> expectedIter = expectedConf.iterator();
+    while (expectedIter.hasNext()) {
+      Entry<String,String> e = expectedIter.next();
+      assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+    }
+    
+    // Basically, for all the keys in actualConf, make sure the values in both confs are equal
+    Iterator<Entry<String,String>> actualIter = actualConf.iterator();
+    while (actualIter.hasNext()) {
+      Entry<String,String> e = actualIter.next();
+      assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+    }
+  }
+  
+  public static void assertKeyValueEqualsTuple(Key key, Value value, Tuple tuple) throws ExecException {
+    assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray) tuple.get(0)).get()));
+    assertTrue(Arrays.equals(key.getColumnFamily().getBytes(), ((DataByteArray) tuple.get(1)).get()));
+    assertTrue(Arrays.equals(key.getColumnQualifier().getBytes(), ((DataByteArray) tuple.get(2)).get()));
+    assertTrue(Arrays.equals(key.getColumnVisibility().getBytes(), ((DataByteArray) tuple.get(3)).get()));
+    assertEquals(key.getTimestamp(), ((Long) tuple.get(4)).longValue());
+    assertTrue(Arrays.equals(value.get(), ((DataByteArray) tuple.get(5)).get()));
+  }
+  
+  public static void assertWholeRowKeyValueEqualsTuple(Key key, Value value, Tuple mainTuple) throws IOException {
+    assertTrue(Arrays.equals(key.getRow().getBytes(), ((DataByteArray) mainTuple.get(0)).get()));
+    
+    DefaultDataBag bag = (DefaultDataBag) mainTuple.get(1);
+    Iterator<Tuple> iter = bag.iterator();
+    
+    for (Entry<Key,Value> e : WholeRowIterator.decodeRow(key, value).entrySet()) {
+      Tuple tuple = iter.next();
+      
+      assertTrue(Arrays.equals(e.getKey().getColumnFamily().getBytes(), ((DataByteArray) tuple.get(0)).get()));
+      assertTrue(Arrays.equals(e.getKey().getColumnQualifier().getBytes(), ((DataByteArray) tuple.get(1)).get()));
+      assertTrue(Arrays.equals(e.getKey().getColumnVisibility().getBytes(), ((DataByteArray) tuple.get(2)).get()));
+      assertEquals(e.getKey().getTimestamp(), ((Long) tuple.get(3)).longValue());
+      assertTrue(Arrays.equals(e.getValue().get(), ((DataByteArray) tuple.get(4)).get()));
+    }
+  }
 }