You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by md...@apache.org on 2014/04/22 01:48:04 UTC

[02/11] ACCUMULO-1880 create mapreduce module

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java
new file mode 100644
index 0000000..2864016
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloMultiTableInputFormatTest {
+
+  private static final String PREFIX = AccumuloMultiTableInputFormatTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
+  private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
+  private static final String TEST_TABLE_2 = PREFIX + "_mapred_table_2";
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+        try {
+          String tableName = ((RangeInputSplit) reporter.getInputSplit()).getTableName();
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table1> <table2>");
+      }
+
+      String user = args[0];
+      String pass = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloMultiTableInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloMultiTableInputFormat.setMockInstance(job, INSTANCE_NAME);
+
+      InputTableConfig tableConfig1 = new InputTableConfig();
+      InputTableConfig tableConfig2 = new InputTableConfig();
+
+      Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
+      configMap.put(table1, tableConfig1);
+      configMap.put(table2, tableConfig2);
+
+      AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1)));
+      t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes()));
+      bw.addMutation(t1m);
+      Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1)));
+      t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes()));
+      bw2.addMutation(t2m);
+    }
+    bw.close();
+    bw2.close();
+
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  /**
+   * Verify {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} objects get correctly serialized in the JobContext.
+   */
+  @Test
+  public void testTableQueryConfigSerialization() throws IOException {
+
+    JobConf job = new JobConf();
+
+    InputTableConfig table1 = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text,Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+
+    InputTableConfig table2 = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text,Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+
+    Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
+    configMap.put(TEST_TABLE_1, table1);
+    configMap.put(TEST_TABLE_2, table2);
+    AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
+
+    assertEquals(table1, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_1));
+    assertEquals(table2, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java
new file mode 100644
index 0000000..36054c8
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class AccumuloOutputFormatTest {
+  private static AssertionError e1 = null;
+  private static final String PREFIX = AccumuloOutputFormatTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
+  private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
+  private static final String TEST_TABLE_2 = PREFIX + "_mapred_table_2";
+  
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+      
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+      
+      @Override
+      public void configure(JobConf job) {}
+      
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+      
+    }
+    
+    @Override
+    public int run(String[] args) throws Exception {
+      
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <inputtable> <outputtable>");
+      }
+      
+      String user = args[0];
+      String pass = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+      
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+      
+      job.setInputFormat(AccumuloInputFormat.class);
+      
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloInputFormat.setInputTableName(job, table1);
+      AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
+      
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+      
+      AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
+      AccumuloOutputFormat.setMockInstance(job, INSTANCE_NAME);
+      
+      job.setNumReduceTasks(0);
+      
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+    
+    public static void main(String[] args) throws Exception {
+      assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
+    }
+  }
+  
+  @Test
+  public void testBWSettings() throws IOException {
+    JobConf job = new JobConf();
+    
+    // make sure we aren't testing defaults
+    final BatchWriterConfig bwDefaults = new BatchWriterConfig();
+    assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS));
+    assertNotEquals(42, bwDefaults.getMaxWriteThreads());
+    assertNotEquals(1123581321l, bwDefaults.getMaxMemory());
+    
+    final BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS);
+    bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS);
+    bwConfig.setMaxWriteThreads(42);
+    bwConfig.setMaxMemory(1123581321l);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+    
+    AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
+      @Override
+      public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+        BatchWriterConfig bwOpts = getBatchWriterOptions(job);
+        
+        // passive check
+        assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
+        assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), bwOpts.getTimeout(TimeUnit.MILLISECONDS));
+        assertEquals(bwConfig.getMaxWriteThreads(), bwOpts.getMaxWriteThreads());
+        assertEquals(bwConfig.getMaxMemory(), bwOpts.getMaxMemory());
+        
+        // explicit check
+        assertEquals(7654321l, bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
+        assertEquals(9898989l, bwOpts.getTimeout(TimeUnit.MILLISECONDS));
+        assertEquals(42, bwOpts.getMaxWriteThreads());
+        assertEquals(1123581321l, bwOpts.getMaxMemory());
+        
+      }
+    };
+    myAOF.checkOutputSpecs(null, job);
+  }
+  
+  @Test
+  public void testMR() throws Exception {
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
+    assertNull(e1);
+    
+    Scanner scanner = c.createScanner(TEST_TABLE_2, new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+    assertFalse(iter.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java
new file mode 100644
index 0000000..a0ae0b3
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloRowInputFormatTest {
+  private static final String PREFIX = AccumuloRowInputFormatTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
+  private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
+  
+  private static final String ROW1 = "row1";
+  private static final String ROW2 = "row2";
+  private static final String ROW3 = "row3";
+  private static final String COLF1 = "colf1";
+  private static List<Entry<Key,Value>> row1;
+  private static List<Entry<Key,Value>> row2;
+  private static List<Entry<Key,Value>> row3;
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+  
+  public AccumuloRowInputFormatTest() {
+    row1 = new ArrayList<Entry<Key,Value>>();
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes()));
+    row2 = new ArrayList<Entry<Key,Value>>();
+    row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes()));
+    row3 = new ArrayList<Entry<Key,Value>>();
+    row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes()));
+  }
+  
+  public static void checkLists(final List<Entry<Key,Value>> first, final List<Entry<Key,Value>> second) {
+    assertEquals("Sizes should be the same.", first.size(), second.size());
+    for (int i = 0; i < first.size(); i++) {
+      assertEquals("Keys should be equal.", first.get(i).getKey(), second.get(i).getKey());
+      assertEquals("Values should be equal.", first.get(i).getValue(), second.get(i).getValue());
+    }
+  }
+  
+  public static void checkLists(final List<Entry<Key,Value>> first, final Iterator<Entry<Key,Value>> second) {
+    int entryIndex = 0;
+    while (second.hasNext()) {
+      final Entry<Key,Value> entry = second.next();
+      assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey());
+      assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue());
+      entryIndex++;
+    }
+  }
+  
+  public static void insertList(final BatchWriter writer, final List<Entry<Key,Value>> list) throws MutationsRejectedException {
+    for (Entry<Key,Value> e : list) {
+      final Key key = e.getKey();
+      final Mutation mutation = new Mutation(key.getRow());
+      ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility());
+      mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, key.getTimestamp(), e.getValue());
+      writer.addMutation(mutation);
+    }
+  }
+  
+  private static class MRTester extends Configured implements Tool {
+    public static class TestMapper implements Mapper<Text,PeekingIterator<Entry<Key,Value>>,Key,Value> {
+      int count = 0;
+      
+      @Override
+      public void map(Text k, PeekingIterator<Entry<Key,Value>> v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+        try {
+          switch (count) {
+            case 0:
+              assertEquals("Current key should be " + ROW1, new Text(ROW1), k);
+              checkLists(row1, v);
+              break;
+            case 1:
+              assertEquals("Current key should be " + ROW2, new Text(ROW2), k);
+              checkLists(row2, v);
+              break;
+            case 2:
+              assertEquals("Current key should be " + ROW3, new Text(ROW3), k);
+              checkLists(row3, v);
+              break;
+            default:
+              assertTrue(false);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        count++;
+      }
+      
+      @Override
+      public void configure(JobConf job) {}
+      
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(3, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+      
+    }
+    
+    @Override
+    public int run(String[] args) throws Exception {
+      
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>");
+      }
+      
+      String user = args[0];
+      String pass = args[1];
+      String table = args[2];
+      
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+      
+      job.setInputFormat(AccumuloRowInputFormat.class);
+      
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloRowInputFormat.setMockInstance(job, INSTANCE_NAME);
+      
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+      
+      job.setNumReduceTasks(0);
+      
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+    
+    public static void main(String[] args) throws Exception {
+      assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
+    }
+  }
+  
+  @Test
+  public void test() throws Exception {
+    final MockInstance instance = new MockInstance(INSTANCE_NAME);
+    final Connector conn = instance.getConnector("root", new PasswordToken(""));
+    conn.tableOperations().create(TEST_TABLE_1);
+    BatchWriter writer = null;
+    try {
+      writer = conn.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+      insertList(writer, row1);
+      insertList(writer, row2);
+      insertList(writer, row3);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1});
+    assertNull(e1);
+    assertNull(e2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java
new file mode 100644
index 0000000..0e1fe39
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * 
+ */
+public class TokenFileTest {
+  private static AssertionError e1 = null;
+  private static final String PREFIX = TokenFileTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
+  private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
+  private static final String TEST_TABLE_2 = PREFIX + "_mapred_table_2";
+
+  private static class MRTokenFileTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName() + " <user> <token file> <inputtable> <outputtable>");
+      }
+
+      String user = args[0];
+      String tokenFile = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, tokenFile);
+      AccumuloInputFormat.setInputTableName(job, table1);
+      AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
+      AccumuloOutputFormat.setMockInstance(job, INSTANCE_NAME);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(args[1]).getParent());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testMR() throws Exception {
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    File tf = folder.newFile("root_test.pw");
+    PrintStream out = new PrintStream(tf);
+    String outString = new Credentials("root", new PasswordToken("")).serialize();
+    out.println(outString);
+    out.close();
+
+    MRTokenFileTester.main(new String[] {"root", tf.getAbsolutePath(), TEST_TABLE_1, TEST_TABLE_2});
+    assertNull(e1);
+
+    Scanner scanner = c.createScanner(TEST_TABLE_2, new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+    assertFalse(iter.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
new file mode 100644
index 0000000..2a453e3
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AccumuloFileOutputFormatTest {
+  private static final String PREFIX = AccumuloFileOutputFormatTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
+  private static final String BAD_TABLE = PREFIX + "_mapreduce_bad_table";
+  private static final String TEST_TABLE = PREFIX + "_mapreduce_test_table";
+  private static final String EMPTY_TABLE = PREFIX + "_mapreduce_empty_table";
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(EMPTY_TABLE);
+    c.tableOperations().create(TEST_TABLE);
+    c.tableOperations().create(BAD_TABLE);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+    Mutation m = new Mutation("Key");
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
+    bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+    m = new Mutation("r1");
+    m.put("cf1", "cq1", "A&B");
+    m.put("cf1", "cq1", "A&B");
+    m.put("cf1", "cq2", "A&");
+    bw.addMutation(m);
+    bw.close();
+  }
+
+  @Test
+  public void testEmptyWrite() throws Exception {
+    handleWriteTests(false);
+  }
+
+  @Test
+  public void testRealWrite() throws Exception {
+    handleWriteTests(true);
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class BadKeyMapper extends Mapper<Key,Value,Key,Value> {
+      int index = 0;
+
+      @Override
+      protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
+        try {
+          try {
+            context.write(key, value);
+            if (index == 2)
+              assertTrue(false);
+          } catch (Exception e) {
+            assertEquals(2, index);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        index++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          assertEquals(2, index);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <outputfile>");
+      }
+
+      String user = args[0];
+      String pass = args[1];
+      String table = args[2];
+
+      @SuppressWarnings("deprecation")
+      Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
+      AccumuloFileOutputFormat.setOutputPath(job, new Path(args[3]));
+
+      job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : Mapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
+    }
+  }
+
+  public void handleWriteTests(boolean content) throws Exception {
+    File f = folder.newFile("handleWriteTests");
+    f.delete();
+    MRTester.main(new String[] {"root", "", content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+    assertTrue(f.exists());
+    File[] files = f.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File file) {
+        return file.getName().startsWith("part-m-");
+      }
+    });
+    if (content) {
+      assertEquals(1, files.length);
+      assertTrue(files[0].exists());
+    } else {
+      assertEquals(0, files.length);
+    }
+  }
+
+  @Test
+  public void writeBadVisibility() throws Exception {
+    File f = folder.newFile("writeBadVisibility");
+    f.delete();
+    MRTester.main(new String[] {"root", "", BAD_TABLE, f.getAbsolutePath()});
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  @Test
+  public void validateConfiguration() throws IOException, InterruptedException {
+
+    int a = 7;
+    long b = 300l;
+    long c = 50l;
+    long d = 10l;
+    String e = "snappy";
+
+    @SuppressWarnings("deprecation")
+    Job job1 = new Job();
+    AccumuloFileOutputFormat.setReplication(job1, a);
+    AccumuloFileOutputFormat.setFileBlockSize(job1, b);
+    AccumuloFileOutputFormat.setDataBlockSize(job1, c);
+    AccumuloFileOutputFormat.setIndexBlockSize(job1, d);
+    AccumuloFileOutputFormat.setCompressionType(job1, e);
+
+    AccumuloConfiguration acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job1);
+
+    assertEquals(7, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    assertEquals(300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    assertEquals(50l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    assertEquals(10l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+
+    a = 17;
+    b = 1300l;
+    c = 150l;
+    d = 110l;
+    e = "lzo";
+
+    @SuppressWarnings("deprecation")
+    Job job2 = new Job();
+    AccumuloFileOutputFormat.setReplication(job2, a);
+    AccumuloFileOutputFormat.setFileBlockSize(job2, b);
+    AccumuloFileOutputFormat.setDataBlockSize(job2, c);
+    AccumuloFileOutputFormat.setIndexBlockSize(job2, d);
+    AccumuloFileOutputFormat.setCompressionType(job2, e);
+
+    acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job2);
+
+    assertEquals(17, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    assertEquals(1300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    assertEquals(150l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
new file mode 100644
index 0000000..2500972
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AccumuloInputFormatTest {
+
+  private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName();
+
+  /**
+   * Check that the iterator configuration is getting stored in the Job conf correctly.
+   */
+  @Test
+  public void testSetIterator() throws IOException {
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+
+    IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator");
+    AccumuloInputFormat.addIterator(job, is);
+    Configuration conf = job.getConfiguration();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    is.write(new DataOutputStream(baos));
+    String iterators = conf.get("AccumuloInputFormat.ScanOpts.Iterators");
+    assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators);
+  }
+
+  @Test
+  public void testAddIterator() throws IOException {
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
+    iter.addOption("v1", "1");
+    iter.addOption("junk", "\0omg:!\\xyzzy");
+    AccumuloInputFormat.addIterator(job, iter);
+
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
+
+    // Check the list size
+    assertTrue(list.size() == 3);
+
+    // Walk the list and make sure our settings are correct
+    IteratorSetting setting = list.get(0);
+    assertEquals(1, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass());
+    assertEquals("WholeRow", setting.getName());
+    assertEquals(0, setting.getOptions().size());
+
+    setting = list.get(1);
+    assertEquals(2, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
+    assertEquals("Versions", setting.getName());
+    assertEquals(0, setting.getOptions().size());
+
+    setting = list.get(2);
+    assertEquals(3, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
+    assertEquals("Count", setting.getName());
+    assertEquals(2, setting.getOptions().size());
+    assertEquals("1", setting.getOptions().get("v1"));
+    assertEquals("\0omg:!\\xyzzy", setting.getOptions().get("junk"));
+  }
+
+  /**
+   * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There
+   * should be no exceptions thrown when trying to parse these types of option entries.
+   * 
+   * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected.
+   */
+  @Test
+  public void testIteratorOptionEncoding() throws Throwable {
+    String key = "colon:delimited:key";
+    String value = "comma,delimited,value";
+    IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class");
+    someSetting.addOption(key, value);
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+    AccumuloInputFormat.addIterator(job, someSetting);
+
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
+    assertEquals(1, list.size());
+    assertEquals(1, list.get(0).getOptions().size());
+    assertEquals(list.get(0).getOptions().get(key), value);
+
+    someSetting.addOption(key + "2", value);
+    someSetting.setPriority(2);
+    someSetting.setName("it2");
+    AccumuloInputFormat.addIterator(job, someSetting);
+    list = AccumuloInputFormat.getIterators(job);
+    assertEquals(2, list.size());
+    assertEquals(1, list.get(0).getOptions().size());
+    assertEquals(list.get(0).getOptions().get(key), value);
+    assertEquals(2, list.get(1).getOptions().size());
+    assertEquals(list.get(1).getOptions().get(key), value);
+    assertEquals(list.get(1).getOptions().get(key + "2"), value);
+  }
+
+  /**
+   * Test getting iterator settings for multiple iterators set
+   */
+  @Test
+  public void testGetIteratorSettings() throws IOException {
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
+
+    List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
+
+    // Check the list size
+    assertTrue(list.size() == 3);
+
+    // Walk the list and make sure our settings are correct
+    IteratorSetting setting = list.get(0);
+    assertEquals(1, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass());
+    assertEquals("WholeRow", setting.getName());
+
+    setting = list.get(1);
+    assertEquals(2, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
+    assertEquals("Versions", setting.getName());
+
+    setting = list.get(2);
+    assertEquals(3, setting.getPriority());
+    assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
+    assertEquals("Count", setting.getName());
+
+  }
+
+  @Test
+  public void testSetRegex() throws IOException {
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+
+    String regex = ">\"*%<>\'\\";
+
+    IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
+    RegExFilter.setRegexs(is, regex, null, null, null, false);
+    AccumuloInputFormat.addIterator(job, is);
+
+    assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName()));
+  }
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 5) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <instanceName> <inputFormatClass>");
+      }
+
+      String user = args[0];
+      String pass = args[1];
+      String table = args[2];
+
+      String instanceName = args[3];
+      String inputFormatClassName = args[4];
+      @SuppressWarnings("unchecked")
+      Class<? extends InputFormat<?,?>> inputFormatClass = (Class<? extends InputFormat<?,?>>) Class.forName(inputFormatClassName);
+
+      @SuppressWarnings("deprecation")
+      Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(inputFormatClass);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloInputFormat.setMockInstance(job, instanceName);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static int main(String[] args) throws Exception {
+      return ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args);
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
+    final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
+
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(TEST_TABLE_1);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(0, MRTester.main(new String[] {"root", "", TEST_TABLE_1, INSTANCE_NAME, AccumuloInputFormat.class.getCanonicalName()}));
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  @Test
+  public void testCorrectRangeInputSplits() throws Exception {
+    @SuppressWarnings("deprecation")
+    Job job = new Job(new Configuration(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+
+    String username = "user", table = "table", instance = "instance";
+    PasswordToken password = new PasswordToken("password");
+    Authorizations auths = new Authorizations("foo");
+    Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
+    boolean isolated = true, localIters = true;
+    Level level = Level.WARN;
+
+    Instance inst = new MockInstance(instance);
+    Connector connector = inst.getConnector(username, password);
+    connector.tableOperations().create(table);
+
+    AccumuloInputFormat.setConnectorInfo(job, username, password);
+    AccumuloInputFormat.setInputTableName(job, table);
+    AccumuloInputFormat.setScanAuthorizations(job, auths);
+    AccumuloInputFormat.setMockInstance(job, instance);
+    AccumuloInputFormat.setScanIsolation(job, isolated);
+    AccumuloInputFormat.setLocalIterators(job, localIters);
+    AccumuloInputFormat.fetchColumns(job, fetchColumns);
+    AccumuloInputFormat.setLogLevel(job, level);
+
+    AccumuloInputFormat aif = new AccumuloInputFormat();
+
+    List<InputSplit> splits = aif.getSplits(job);
+
+    Assert.assertEquals(1, splits.size());
+
+    InputSplit split = splits.get(0);
+
+    Assert.assertEquals(RangeInputSplit.class, split.getClass());
+
+    RangeInputSplit risplit = (RangeInputSplit) split;
+
+    Assert.assertEquals(username, risplit.getPrincipal());
+    Assert.assertEquals(table, risplit.getTableName());
+    Assert.assertEquals(password, risplit.getToken());
+    Assert.assertEquals(auths, risplit.getAuths());
+    Assert.assertEquals(instance, risplit.getInstanceName());
+    Assert.assertEquals(isolated, risplit.isIsolatedScan());
+    Assert.assertEquals(localIters, risplit.usesLocalIterators());
+    Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
+    Assert.assertEquals(level, risplit.getLogLevel());
+  }
+
+  @Test
+  public void testPartialInputSplitDelegationToConfiguration() throws Exception {
+    String user = "testPartialInputSplitUser";
+    PasswordToken password = new PasswordToken("");
+
+    MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration");
+    Connector c = mockInstance.getConnector(user, password);
+    c.tableOperations().create("testtable");
+    BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(
+        0,
+        MRTester.main(new String[] {user, "", "testtable", "testPartialInputSplitDelegationToConfiguration",
+            EmptySplitsAccumuloInputFormat.class.getCanonicalName()}));
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  @Test
+  public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception {
+    String user = "testPartialFailedInputSplit";
+    PasswordToken password = new PasswordToken("");
+
+    MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration");
+    Connector c = mockInstance.getConnector(user, password);
+    c.tableOperations().create("testtable");
+    BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    // We should fail before we even get into the Mapper because we can't make the RecordReader
+    Assert.assertEquals(
+        1,
+        MRTester.main(new String[] {user, "", "testtable", "testPartialFailedInputSplitDelegationToConfiguration",
+            BadPasswordSplitsAccumuloInputFormat.class.getCanonicalName()}));
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  @Test
+  public void testEmptyColumnFamily() throws IOException {
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+    Set<Pair<Text,Text>> cols = new HashSet<Pair<Text,Text>>();
+    cols.add(new Pair<Text,Text>(new Text(""), null));
+    cols.add(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
+    cols.add(new Pair<Text,Text>(new Text(""), new Text("bar")));
+    cols.add(new Pair<Text,Text>(new Text(""), new Text("")));
+    cols.add(new Pair<Text,Text>(new Text("foo"), new Text("")));
+    AccumuloInputFormat.fetchColumns(job, cols);
+    Set<Pair<Text,Text>> setCols = AccumuloInputFormat.getFetchedColumns(job);
+    assertEquals(cols, setCols);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
new file mode 100644
index 0000000..05fbbb4
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloMultiTableInputFormatTest {
+
+  private static final String PREFIX = AccumuloMultiTableInputFormatTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
+  private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
+  private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2";
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+
+    private static class TestMapper extends Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          String tableName = ((RangeInputSplit) context.getInputSplit()).getTableName();
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table1> <table2>");
+      }
+
+      String user = args[0];
+      String pass = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+
+      @SuppressWarnings("deprecation")
+      Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloMultiTableInputFormat.class);
+
+      AccumuloMultiTableInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+
+      InputTableConfig tableConfig1 = new InputTableConfig();
+      InputTableConfig tableConfig2 = new InputTableConfig();
+
+      Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
+      configMap.put(table1, tableConfig1);
+      configMap.put(table2, tableConfig2);
+
+      AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
+      AccumuloMultiTableInputFormat.setMockInstance(job, INSTANCE_NAME);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
+    }
+  }
+
+  /**
+   * Generate incrementing counts and attach table name to the key/value so that order and multi-table data can be verified.
+   */
+  @Test
+  public void testMap() throws Exception {
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1)));
+      t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes()));
+      bw.addMutation(t1m);
+      Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1)));
+      t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes()));
+      bw2.addMutation(t2m);
+    }
+    bw.close();
+    bw2.close();
+
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  /**
+   * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext.
+   */
+  @Test
+  public void testInputTableConfigSerialization() throws IOException {
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+
+    InputTableConfig tableConfig = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b")))
+        .fetchColumns(Collections.singleton(new Pair<Text,Text>(new Text("CF1"), new Text("CQ1"))))
+        .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
+
+    Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
+    configMap.put(TEST_TABLE_1, tableConfig);
+    configMap.put(TEST_TABLE_2, tableConfig);
+
+    AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
+
+    assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_1));
+    assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_2));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
new file mode 100644
index 0000000..a0cb4e3
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class AccumuloOutputFormatTest {
+  private static AssertionError e1 = null;
+  private static final String PREFIX = AccumuloOutputFormatTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
+  private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
+  private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2";
+  
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+      
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        context.write(new Text(), m);
+      }
+    }
+    
+    @Override
+    public int run(String[] args) throws Exception {
+      
+      if (args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <inputtable> <outputtable>");
+      }
+      
+      String user = args[0];
+      String pass = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+
+      @SuppressWarnings("deprecation")
+      Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+      
+      job.setInputFormatClass(AccumuloInputFormat.class);
+      
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloInputFormat.setInputTableName(job, table1);
+      AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
+      
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+      
+      AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
+      AccumuloOutputFormat.setMockInstance(job, INSTANCE_NAME);
+      
+      job.setNumReduceTasks(0);
+      
+      job.waitForCompletion(true);
+      
+      return job.isSuccessful() ? 0 : 1;
+    }
+    
+    public static void main(String[] args) throws Exception {
+      assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
+    }
+  }
+  
+  @Test
+  public void testBWSettings() throws IOException {
+    @SuppressWarnings("deprecation")
+    Job job = new Job();
+    
+    // make sure we aren't testing defaults
+    final BatchWriterConfig bwDefaults = new BatchWriterConfig();
+    assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS));
+    assertNotEquals(42, bwDefaults.getMaxWriteThreads());
+    assertNotEquals(1123581321l, bwDefaults.getMaxMemory());
+    
+    final BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS);
+    bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS);
+    bwConfig.setMaxWriteThreads(42);
+    bwConfig.setMaxMemory(1123581321l);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+    
+    AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
+      @Override
+      public void checkOutputSpecs(JobContext job) throws IOException {
+        BatchWriterConfig bwOpts = getBatchWriterOptions(job);
+        
+        // passive check
+        assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
+        assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), bwOpts.getTimeout(TimeUnit.MILLISECONDS));
+        assertEquals(bwConfig.getMaxWriteThreads(), bwOpts.getMaxWriteThreads());
+        assertEquals(bwConfig.getMaxMemory(), bwOpts.getMaxMemory());
+        
+        // explicit check
+        assertEquals(7654321l, bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
+        assertEquals(9898989l, bwOpts.getTimeout(TimeUnit.MILLISECONDS));
+        assertEquals(42, bwOpts.getMaxWriteThreads());
+        assertEquals(1123581321l, bwOpts.getMaxMemory());
+        
+      }
+    };
+    myAOF.checkOutputSpecs(job);
+  }
+  
+  @Test
+  public void testMR() throws Exception {
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+    
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
+    assertNull(e1);
+    
+    Scanner scanner = c.createScanner(TEST_TABLE_2, new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+    assertFalse(iter.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
new file mode 100644
index 0000000..2207437
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloRowInputFormatTest {
+  private static final String PREFIX = AccumuloRowInputFormatTest.class.getSimpleName();
+  private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
+  private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
+  
+  private static final String ROW1 = "row1";
+  private static final String ROW2 = "row2";
+  private static final String ROW3 = "row3";
+  private static final String COLF1 = "colf1";
+  private static List<Entry<Key,Value>> row1;
+  private static List<Entry<Key,Value>> row2;
+  private static List<Entry<Key,Value>> row3;
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+  
+  public AccumuloRowInputFormatTest() {
+    row1 = new ArrayList<Entry<Key,Value>>();
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes()));
+    row2 = new ArrayList<Entry<Key,Value>>();
+    row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes()));
+    row3 = new ArrayList<Entry<Key,Value>>();
+    row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes()));
+  }
+  
+  public static void checkLists(final List<Entry<Key,Value>> first, final List<Entry<Key,Value>> second) {
+    assertEquals("Sizes should be the same.", first.size(), second.size());
+    for (int i = 0; i < first.size(); i++) {
+      assertEquals("Keys should be equal.", first.get(i).getKey(), second.get(i).getKey());
+      assertEquals("Values should be equal.", first.get(i).getValue(), second.get(i).getValue());
+    }
+  }
+  
+  public static void checkLists(final List<Entry<Key,Value>> first, final Iterator<Entry<Key,Value>> second) {
+    int entryIndex = 0;
+    while (second.hasNext()) {
+      final Entry<Key,Value> entry = second.next();
+      assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey());
+      assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue());
+      entryIndex++;
+    }
+  }
+  
+  public static void insertList(final BatchWriter writer, final List<Entry<Key,Value>> list) throws MutationsRejectedException {
+    for (Entry<Key,Value> e : list) {
+      final Key key = e.getKey();
+      final Mutation mutation = new Mutation(key.getRow());
+      ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility());
+      mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, key.getTimestamp(), e.getValue());
+      writer.addMutation(mutation);
+    }
+  }
+  
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Text,PeekingIterator<Entry<Key,Value>>,Key,Value> {
+      int count = 0;
+      
+      @Override
+      protected void map(Text k, PeekingIterator<Entry<Key,Value>> v, Context context) throws IOException, InterruptedException {
+        try {
+          switch (count) {
+            case 0:
+              assertEquals("Current key should be " + ROW1, new Text(ROW1), k);
+              checkLists(row1, v);
+              break;
+            case 1:
+              assertEquals("Current key should be " + ROW2, new Text(ROW2), k);
+              checkLists(row2, v);
+              break;
+            case 2:
+              assertEquals("Current key should be " + ROW3, new Text(ROW3), k);
+              checkLists(row3, v);
+              break;
+            default:
+              assertTrue(false);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        count++;
+      }
+      
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          assertEquals(3, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+    }
+    
+    @Override
+    public int run(String[] args) throws Exception {
+      
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>");
+      }
+      
+      String user = args[0];
+      String pass = args[1];
+      String table = args[2];
+
+      @SuppressWarnings("deprecation")
+      Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+      
+      job.setInputFormatClass(AccumuloRowInputFormat.class);
+      
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloRowInputFormat.setMockInstance(job, INSTANCE_NAME);
+      
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+      
+      job.setNumReduceTasks(0);
+      
+      job.waitForCompletion(true);
+      
+      return job.isSuccessful() ? 0 : 1;
+    }
+    
+    public static void main(String[] args) throws Exception {
+      assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
+    }
+  }
+  
+  @Test
+  public void test() throws Exception {
+    final MockInstance instance = new MockInstance(INSTANCE_NAME);
+    final Connector conn = instance.getConnector("root", new PasswordToken(""));
+    conn.tableOperations().create(TEST_TABLE_1);
+    BatchWriter writer = null;
+    try {
+      writer = conn.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+      insertList(writer, row1);
+      insertList(writer, row2);
+      insertList(writer, row3);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1});
+    assertNull(e1);
+    assertNull(e2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/99baad37/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java
new file mode 100644
index 0000000..fce7781
--- /dev/null
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+/**
+ * AccumuloInputFormat which returns an "empty" RangeInputSplit
+ */
+public class BadPasswordSplitsAccumuloInputFormat extends AccumuloInputFormat {
+  
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    List<InputSplit> splits = super.getSplits(context);
+    
+    for (InputSplit split : splits) {
+      org.apache.accumulo.core.client.mapreduce.RangeInputSplit rangeSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
+      rangeSplit.setToken(new PasswordToken("anythingelse"));
+    }
+    
+    return splits;
+  }
+}