You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/07/30 23:51:38 UTC

[04/14] accumulo git commit: ACCUMULO-3920 Convert more tests from mock

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
new file mode 100644
index 0000000..11fb6b0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClientSideIteratorIT extends AccumuloClusterHarness {
+  private List<Key> resultSet1;
+  private List<Key> resultSet2;
+  private List<Key> resultSet3;
+
+  @Before
+  public void setupData() {
+    resultSet1 = new ArrayList<Key>();
+    resultSet1.add(new Key("row1", "colf", "colq", 4l));
+    resultSet1.add(new Key("row1", "colf", "colq", 3l));
+    resultSet2 = new ArrayList<Key>();
+    resultSet2.add(new Key("row1", "colf", "colq", 4l));
+    resultSet2.add(new Key("row1", "colf", "colq", 3l));
+    resultSet2.add(new Key("row1", "colf", "colq", 2l));
+    resultSet2.add(new Key("row1", "colf", "colq", 1l));
+    resultSet3 = new ArrayList<Key>();
+    resultSet3.add(new Key("part1", "", "doc2"));
+    resultSet3.add(new Key("part2", "", "DOC2"));
+  }
+
+  private void checkResults(final Iterable<Entry<Key,Value>> scanner, final List<Key> results, final PartialKey pk) {
+    int i = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      assertTrue(entry.getKey().equals(results.get(i++), pk));
+    }
+    assertEquals(i, results.size());
+  }
+
+  private Connector conn;
+  private String tableName;
+
+  @Before
+  public void setupInstance() throws Exception {
+    conn = getConnector();
+    tableName = getUniqueNames(1)[0];
+  }
+
+  @Test
+  public void testIntersect() throws Exception {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("part1");
+    m.put("bar", "doc1", "value");
+    m.put("bar", "doc2", "value");
+    m.put("dog", "doc3", "value");
+    m.put("foo", "doc2", "value");
+    m.put("foo", "doc3", "value");
+    bw.addMutation(m);
+    m = new Mutation("part2");
+    m.put("bar", "DOC1", "value");
+    m.put("bar", "DOC2", "value");
+    m.put("dog", "DOC3", "value");
+    m.put("foo", "DOC2", "value");
+    m.put("foo", "DOC3", "value");
+    bw.addMutation(m);
+    bw.flush();
+
+    final ClientSideIteratorScanner csis = new ClientSideIteratorScanner(conn.createScanner(tableName, new Authorizations()));
+    final IteratorSetting si = new IteratorSetting(10, tableName, IntersectingIterator.class);
+    IntersectingIterator.setColumnFamilies(si, new Text[] {new Text("bar"), new Text("foo")});
+    csis.addScanIterator(si);
+
+    checkResults(csis, resultSet3, PartialKey.ROW_COLFAM_COLQUAL);
+  }
+
+  @Test
+  public void testVersioning() throws Exception {
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().removeProperty(tableName, "table.iterator.scan.vers");
+    conn.tableOperations().removeProperty(tableName, "table.iterator.majc.vers");
+    conn.tableOperations().removeProperty(tableName, "table.iterator.minc.vers");
+    final BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("row1");
+    m.put("colf", "colq", 1l, "value");
+    m.put("colf", "colq", 2l, "value");
+    bw.addMutation(m);
+    bw.flush();
+    m = new Mutation("row1");
+    m.put("colf", "colq", 3l, "value");
+    m.put("colf", "colq", 4l, "value");
+    bw.addMutation(m);
+    bw.flush();
+
+    final Scanner scanner = conn.createScanner(tableName, new Authorizations());
+
+    final ClientSideIteratorScanner csis = new ClientSideIteratorScanner(scanner);
+    final IteratorSetting si = new IteratorSetting(10, "localvers", VersioningIterator.class);
+    si.addOption("maxVersions", "2");
+    csis.addScanIterator(si);
+
+    checkResults(csis, resultSet1, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME);
+    checkResults(scanner, resultSet2, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME);
+
+    csis.fetchColumnFamily(new Text("colf"));
+    checkResults(csis, resultSet1, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME);
+    csis.clearColumns();
+    csis.fetchColumnFamily(new Text("none"));
+    assertFalse(csis.iterator().hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index 621620f..249d88f 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -34,6 +34,7 @@ import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -62,6 +63,7 @@ import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.format.Formatter;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.shell.Shell;
 import org.apache.accumulo.test.functional.SlowIterator;
@@ -86,6 +88,7 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
 public class ShellServerIT extends SharedMiniClusterBase {
@@ -1083,6 +1086,115 @@ public class ShellServerIT extends SharedMiniClusterBase {
   }
 
   @Test
+  public void formatter() throws Exception {
+    ts.exec("createtable formatter_test", true);
+    ts.exec("table formatter_test", true);
+    ts.exec("insert row cf cq 1234abcd", true);
+    ts.exec("insert row cf1 cq1 9876fedc", true);
+    ts.exec("insert row2 cf cq 13579bdf", true);
+    ts.exec("insert row2 cf1 cq 2468ace", true);
+
+    ArrayList<String> expectedDefault = new ArrayList<>(4);
+    expectedDefault.add("row cf:cq []    1234abcd");
+    expectedDefault.add("row cf1:cq1 []    9876fedc");
+    expectedDefault.add("row2 cf:cq []    13579bdf");
+    expectedDefault.add("row2 cf1:cq []    2468ace");
+    ArrayList<String> actualDefault = new ArrayList<>(4);
+    boolean isFirst = true;
+    for (String s : ts.exec("scan -np", true).split("[\n\r]+")) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        actualDefault.add(s);
+      }
+    }
+
+    ArrayList<String> expectedFormatted = new ArrayList<>(4);
+    expectedFormatted.add("row cf:cq []    0x31 0x32 0x33 0x34 0x61 0x62 0x63 0x64");
+    expectedFormatted.add("row cf1:cq1 []    0x39 0x38 0x37 0x36 0x66 0x65 0x64 0x63");
+    expectedFormatted.add("row2 cf:cq []    0x31 0x33 0x35 0x37 0x39 0x62 0x64 0x66");
+    expectedFormatted.add("row2 cf1:cq []    0x32 0x34 0x36 0x38 0x61 0x63 0x65");
+    ts.exec("formatter -t formatter_test -f " + HexFormatter.class.getName(), true);
+    ArrayList<String> actualFormatted = new ArrayList<>(4);
+    isFirst = true;
+    for (String s : ts.exec("scan -np", true).split("[\n\r]+")) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        actualFormatted.add(s);
+      }
+    }
+
+    ts.exec("deletetable -f formatter_test", true);
+
+    assertTrue(Iterables.elementsEqual(expectedDefault, new ArrayList<String>(actualDefault)));
+    assertTrue(Iterables.elementsEqual(expectedFormatted, new ArrayList<String>(actualFormatted)));
+  }
+
+  /**
+   * <p>
+   * Simple <code>Formatter</code> that will convert each character in the Value from decimal to hexadecimal. Will automatically skip over characters in the
+   * value which do not fall within the [0-9,a-f] range.
+   * </p>
+   *
+   * <p>
+   * Example: <code>'0'</code> will be displayed as <code>'0x30'</code>
+   * </p>
+   */
+  public static class HexFormatter implements Formatter {
+    private Iterator<Entry<Key,Value>> iter = null;
+    private boolean printTs = false;
+
+    private final static String tab = "\t";
+    private final static String newline = "\n";
+
+    public HexFormatter() {}
+
+    @Override
+    public boolean hasNext() {
+      return this.iter.hasNext();
+    }
+
+    @Override
+    public String next() {
+      final Entry<Key,Value> entry = iter.next();
+
+      String key;
+
+      // Observe the timestamps
+      if (printTs) {
+        key = entry.getKey().toString();
+      } else {
+        key = entry.getKey().toStringNoTime();
+      }
+
+      final Value v = entry.getValue();
+
+      // Approximate how much space we'll need
+      final StringBuilder sb = new StringBuilder(key.length() + v.getSize() * 5);
+
+      sb.append(key).append(tab);
+
+      for (byte b : v.get()) {
+        if ((b >= 48 && b <= 57) || (b >= 97 && b <= 102)) {
+          sb.append(String.format("0x%x ", Integer.valueOf(b)));
+        }
+      }
+
+      return sb.toString().trim() + newline;
+    }
+
+    @Override
+    public void remove() {}
+
+    @Override
+    public void initialize(final Iterable<Entry<Key,Value>> scanner, final boolean printTimestamps) {
+      this.iter = scanner.iterator();
+      this.printTs = printTimestamps;
+    }
+  }
+
+  @Test
   public void extensions() throws Exception {
     String extName = "ExampleShellExtension";
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java b/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java
new file mode 100644
index 0000000..8853733
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.examples.simple.dirlist;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.examples.simple.dirlist.FileCount;
+import org.apache.accumulo.examples.simple.dirlist.FileCount.Opts;
+import org.apache.accumulo.examples.simple.dirlist.Ingest;
+import org.apache.accumulo.examples.simple.dirlist.QueryUtil;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CountIT extends ConfigurableMacBase {
+
+  private Connector conn;
+  private String tableName;
+
+  @Before
+  public void setupInstance() throws Exception {
+    tableName = getUniqueNames(1)[0];
+    conn = getConnector();
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    ColumnVisibility cv = new ColumnVisibility();
+    // / has 1 dir
+    // /local has 2 dirs 1 file
+    // /local/user1 has 2 files
+    bw.addMutation(Ingest.buildMutation(cv, "/local", true, false, true, 272, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user1", true, false, true, 272, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user2", true, false, true, 272, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 23456, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file1", false, false, false, 2024, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file2", false, false, false, 1028, 23456, null));
+    bw.close();
+  }
+
+  @Test
+  public void test() throws Exception {
+    Scanner scanner = conn.createScanner(tableName, new Authorizations());
+    scanner.fetchColumn(new Text("dir"), new Text("counts"));
+    assertFalse(scanner.iterator().hasNext());
+
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.instance = conn.getInstance().getInstanceName();
+    opts.zookeepers = conn.getInstance().getZooKeepers();
+    opts.setTableName(tableName);
+    opts.setPrincipal(conn.whoami());
+    opts.setPassword(new Opts.Password(ROOT_PASSWORD));
+    FileCount fc = new FileCount(opts, scanOpts, bwOpts);
+    fc.run();
+
+    ArrayList<Pair<String,String>> expected = new ArrayList<Pair<String,String>>();
+    expected.add(new Pair<String,String>(QueryUtil.getRow("").toString(), "1,0,3,3"));
+    expected.add(new Pair<String,String>(QueryUtil.getRow("/local").toString(), "2,1,2,3"));
+    expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user1").toString(), "0,2,0,2"));
+    expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user2").toString(), "0,0,0,0"));
+
+    int i = 0;
+    for (Entry<Key,Value> e : scanner) {
+      assertEquals(e.getKey().getRow().toString(), expected.get(i).getFirst());
+      assertEquals(e.getValue().toString(), expected.get(i).getSecond());
+      i++;
+    }
+    assertEquals(i, expected.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java
new file mode 100644
index 0000000..b2b6aff
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.examples.simple.filedata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class ChunkInputFormatIT extends AccumuloClusterHarness {
+
+  // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to ensure test correctness),
+  // so error tests should check to see if there is at least one error (could be more depending on the test) rather than zero
+  private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+  private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
+
+  private static List<Entry<Key,Value>> data;
+  private static List<Entry<Key,Value>> baddata;
+
+  private Connector conn;
+  private String tableName;
+
+  @Before
+  public void setupInstance() throws Exception {
+    conn = getConnector();
+    tableName = getUniqueNames(1)[0];
+    conn.securityOperations().changeUserAuthorizations(conn.whoami(), AUTHS);
+  }
+
+  @BeforeClass
+  public static void setupClass() {
+    System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+
+    data = new ArrayList<Entry<Key,Value>>();
+    ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name");
+    ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", "");
+    ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", "");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", "");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", "");
+    baddata = new ArrayList<Entry<Key,Value>>();
+    ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
+  }
+
+  public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
+    assertEquals(e1.getKey(), e2.getKey());
+    assertEquals(e1.getValue(), e2.getValue());
+  }
+
+  public static class CIFTester extends Configured implements Tool {
+    public static class TestMapper extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+      int count = 0;
+
+      @Override
+      protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        byte[] b = new byte[20];
+        int read;
+        try {
+          switch (count) {
+            case 0:
+              assertEquals(key.size(), 2);
+              entryEquals(key.get(0), data.get(0));
+              entryEquals(key.get(1), data.get(1));
+              assertEquals(read = value.read(b), 8);
+              assertEquals(new String(b, 0, read), "asdfjkl;");
+              assertEquals(read = value.read(b), -1);
+              break;
+            case 1:
+              assertEquals(key.size(), 2);
+              entryEquals(key.get(0), data.get(4));
+              entryEquals(key.get(1), data.get(5));
+              assertEquals(read = value.read(b), 10);
+              assertEquals(new String(b, 0, read), "qwertyuiop");
+              assertEquals(read = value.read(b), -1);
+              break;
+            default:
+              fail();
+          }
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        } finally {
+          value.close();
+        }
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        try {
+          assertEquals(2, count);
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        }
+      }
+    }
+
+    public static class TestNoClose extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+      int count = 0;
+
+      @Override
+      protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        byte[] b = new byte[5];
+        int read;
+        try {
+          switch (count) {
+            case 0:
+              assertEquals(read = value.read(b), 5);
+              assertEquals(new String(b, 0, read), "asdfj");
+              break;
+            default:
+              fail();
+          }
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        }
+        count++;
+        try {
+          context.nextKeyValue();
+          fail();
+        } catch (IOException ioe) {
+          assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe));
+        }
+      }
+    }
+
+    public static class TestBadData extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+      @Override
+      protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        byte[] b = new byte[20];
+        try {
+          assertEquals(key.size(), 2);
+          entryEquals(key.get(0), baddata.get(0));
+          entryEquals(key.get(1), baddata.get(1));
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        }
+        try {
+          assertFalse(value.read(b) > 0);
+          try {
+            fail();
+          } catch (AssertionError e) {
+            assertionErrors.put(table, e);
+          }
+        } catch (Exception e) {
+          // expected, ignore
+        }
+        try {
+          value.close();
+          try {
+            fail();
+          } catch (AssertionError e) {
+            assertionErrors.put(table, e);
+          }
+        } catch (Exception e) {
+          // expected, ignore
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      if (args.length != 2) {
+        throw new IllegalArgumentException("Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
+      }
+
+      String table = args[0];
+      assertionErrors.put(table, new AssertionError("Dummy"));
+      assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
+      getConf().set("MRTester_tableName", table);
+
+      Job job = Job.getInstance(getConf());
+      job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(ChunkInputFormat.class);
+
+      ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+      ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+      ChunkInputFormat.setInputTableName(job, table);
+      ChunkInputFormat.setScanAuthorizations(job, AUTHS);
+
+      @SuppressWarnings("unchecked")
+      Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class.forName(args[1]);
+      job.setMapperClass(forName);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static int main(String... args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      return ToolRunner.run(conf, new CIFTester(), args);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName()));
+    assertEquals(1, assertionErrors.get(tableName).size());
+  }
+
+  @Test
+  public void testErrorOnNextWithoutClose() throws Exception {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName()));
+    assertEquals(1, assertionErrors.get(tableName).size());
+    // this should actually exist, in addition to the dummy entry
+    assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size());
+  }
+
+  @Test
+  public void testInfoWithoutChunks() throws Exception {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    for (Entry<Key,Value> e : baddata) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName()));
+    assertEquals(1, assertionErrors.get(tableName).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java
new file mode 100644
index 0000000..47db6dd
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.examples.simple.filedata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputStream;
+import org.apache.accumulo.examples.simple.filedata.FileDataIngest;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChunkInputStreamIT extends AccumuloClusterHarness {
+
+  private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
+
+  private Connector conn;
+  private String tableName;
+  private List<Entry<Key,Value>> data;
+  private List<Entry<Key,Value>> baddata;
+  private List<Entry<Key,Value>> multidata;
+
+  @Before
+  public void setupInstance() throws Exception {
+    conn = getConnector();
+    tableName = getUniqueNames(1)[0];
+    conn.securityOperations().changeUserAuthorizations(conn.whoami(), AUTHS);
+  }
+
+  @Before
+  public void setupData() {
+    data = new ArrayList<Entry<Key,Value>>();
+    addData(data, "a", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "a", "refs", "id\0name", "A&B", "name");
+    addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "a", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "b", "refs", "id\0name", "A&B", "name");
+    addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "~chunk", 100, 1, "B&C", "");
+    addData(data, "b", "~chunk", 100, 1, "D", "");
+    addData(data, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 1, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 2, "A&B", "");
+    addData(data, "d", "~chunk", 100, 0, "A&B", "");
+    addData(data, "e", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "e", "~chunk", 100, 1, "A&B", "");
+    baddata = new ArrayList<Entry<Key,Value>>();
+    addData(baddata, "a", "~chunk", 100, 0, "A", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 0, "B", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 2, "C", "");
+    addData(baddata, "c", "~chunk", 100, 0, "D", "asdfjkl;");
+    addData(baddata, "c", "~chunk", 100, 2, "E", "");
+    addData(baddata, "d", "~chunk", 100, 0, "F", "asdfjkl;");
+    addData(baddata, "d", "~chunk", 100, 1, "G", "");
+    addData(baddata, "d", "~zzzzz", "colq", "H", "");
+    addData(baddata, "e", "~chunk", 100, 0, "I", "asdfjkl;");
+    addData(baddata, "e", "~chunk", 100, 1, "J", "");
+    addData(baddata, "e", "~chunk", 100, 2, "I", "asdfjkl;");
+    addData(baddata, "f", "~chunk", 100, 2, "K", "asdfjkl;");
+    addData(baddata, "g", "~chunk", 100, 0, "L", "");
+    multidata = new ArrayList<Entry<Key,Value>>();
+    addData(multidata, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "a", "~chunk", 100, 1, "A&B", "");
+    addData(multidata, "a", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 1, "B&C", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 1, "B&C", "");
+  }
+
+  static void addData(List<Entry<Key,Value>> data, String row, String cf, String cq, String vis, String value) {
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), new Text(cq), new Text(vis)), value.getBytes()));
+  }
+
+  static void addData(List<Entry<Key,Value>> data, String row, String cf, int chunkSize, int chunkCount, String vis, String value) {
+    Text chunkCQ = new Text(FileDataIngest.intToBytes(chunkSize));
+    chunkCQ.append(FileDataIngest.intToBytes(chunkCount), 0, 4);
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), chunkCQ, new Text(vis)), value.getBytes()));
+  }
+
+  @Test
+  public void testWithAccumulo() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, IOException {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Scanner scan = conn.createScanner(tableName, AUTHS);
+
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(scan.iterator());
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 10);
+    assertEquals(new String(b, 0, read), "qwertyuiop");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 16);
+    assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    assertFalse(pi.hasNext());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
deleted file mode 100644
index 1d38126..0000000
--- a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.functional;
-
-import static java.lang.System.currentTimeMillis;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.Before;
-import org.junit.Test;
-
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
-public class AccumuloInputFormatIT extends AccumuloClusterHarness {
-
-  AccumuloInputFormat inputFormat;
-
-  @Override
-  protected int defaultTimeoutSeconds() {
-    return 4 * 60;
-  }
-
-  @Override
-  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-  }
-
-  @Before
-  public void before() {
-    inputFormat = new AccumuloInputFormat();
-  }
-
-  /**
-   * Tests several different paths through the getSplits() method by setting different properties and verifying the results.
-   */
-  @Test
-  public void testGetSplits() throws Exception {
-    Connector conn = getConnector();
-    String table = getUniqueNames(1)[0];
-    conn.tableOperations().create(table);
-    insertData(table, currentTimeMillis());
-
-    ClientConfiguration clientConf = cluster.getClientConfig();
-    AccumuloConfiguration clusterClientConf = new ConfigurationCopy(new DefaultConfiguration());
-
-    // Pass SSL and CredentialProvider options into the ClientConfiguration given to AccumuloInputFormat
-    boolean sslEnabled = Boolean.valueOf(clusterClientConf.get(Property.INSTANCE_RPC_SSL_ENABLED));
-    if (sslEnabled) {
-      ClientProperty[] sslProperties = new ClientProperty[] {ClientProperty.INSTANCE_RPC_SSL_ENABLED, ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH,
-          ClientProperty.RPC_SSL_KEYSTORE_PATH, ClientProperty.RPC_SSL_KEYSTORE_TYPE, ClientProperty.RPC_SSL_KEYSTORE_PASSWORD,
-          ClientProperty.RPC_SSL_TRUSTSTORE_PATH, ClientProperty.RPC_SSL_TRUSTSTORE_TYPE, ClientProperty.RPC_SSL_TRUSTSTORE_PASSWORD,
-          ClientProperty.RPC_USE_JSSE, ClientProperty.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS};
-
-      for (ClientProperty prop : sslProperties) {
-        // The default property is returned if it's not in the ClientConfiguration so we don't have to check if the value is actually defined
-        clientConf.setProperty(prop, clusterClientConf.get(prop.getKey()));
-      }
-    }
-
-    Job job = Job.getInstance();
-    AccumuloInputFormat.setInputTableName(job, table);
-    AccumuloInputFormat.setZooKeeperInstance(job, clientConf);
-    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
-
-    // split table
-    TreeSet<Text> splitsToAdd = new TreeSet<Text>();
-    for (int i = 0; i < 10000; i += 1000)
-      splitsToAdd.add(new Text(String.format("%09d", i)));
-    conn.tableOperations().addSplits(table, splitsToAdd);
-    sleepUninterruptibly(500, TimeUnit.MILLISECONDS); // wait for splits to be propagated
-
-    // get splits without setting any range
-    Collection<Text> actualSplits = conn.tableOperations().listSplits(table);
-    List<InputSplit> splits = inputFormat.getSplits(job);
-    assertEquals(actualSplits.size() + 1, splits.size()); // No ranges set on the job so it'll start with -inf
-
-    // set ranges and get splits
-    List<Range> ranges = new ArrayList<Range>();
-    for (Text text : actualSplits)
-      ranges.add(new Range(text));
-    AccumuloInputFormat.setRanges(job, ranges);
-    splits = inputFormat.getSplits(job);
-    assertEquals(actualSplits.size(), splits.size());
-
-    // offline mode
-    AccumuloInputFormat.setOfflineTableScan(job, true);
-    try {
-      inputFormat.getSplits(job);
-      fail("An exception should have been thrown");
-    } catch (IOException e) {}
-
-    conn.tableOperations().offline(table, true);
-    splits = inputFormat.getSplits(job);
-    assertEquals(actualSplits.size(), splits.size());
-
-    // auto adjust ranges
-    ranges = new ArrayList<Range>();
-    for (int i = 0; i < 5; i++)
-      // overlapping ranges
-      ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
-    AccumuloInputFormat.setRanges(job, ranges);
-    splits = inputFormat.getSplits(job);
-    assertEquals(2, splits.size());
-
-    AccumuloInputFormat.setAutoAdjustRanges(job, false);
-    splits = inputFormat.getSplits(job);
-    assertEquals(ranges.size(), splits.size());
-
-    // BatchScan not available for offline scans
-    AccumuloInputFormat.setBatchScan(job, true);
-    // Reset auto-adjust ranges too
-    AccumuloInputFormat.setAutoAdjustRanges(job, true);
-
-    AccumuloInputFormat.setOfflineTableScan(job, true);
-    try {
-      inputFormat.getSplits(job);
-      fail("An exception should have been thrown");
-    } catch (IllegalArgumentException e) {}
-
-    conn.tableOperations().online(table, true);
-    AccumuloInputFormat.setOfflineTableScan(job, false);
-
-    // test for resumption of success
-    splits = inputFormat.getSplits(job);
-    assertEquals(2, splits.size());
-
-    // BatchScan not available with isolated iterators
-    AccumuloInputFormat.setScanIsolation(job, true);
-    try {
-      inputFormat.getSplits(job);
-      fail("An exception should have been thrown");
-    } catch (IllegalArgumentException e) {}
-    AccumuloInputFormat.setScanIsolation(job, false);
-
-    // test for resumption of success
-    splits = inputFormat.getSplits(job);
-    assertEquals(2, splits.size());
-
-    // BatchScan not available with local iterators
-    AccumuloInputFormat.setLocalIterators(job, true);
-    try {
-      inputFormat.getSplits(job);
-      fail("An exception should have been thrown");
-    } catch (IllegalArgumentException e) {}
-    AccumuloInputFormat.setLocalIterators(job, false);
-
-    // Check we are getting back correct type pf split
-    conn.tableOperations().online(table);
-    splits = inputFormat.getSplits(job);
-    for (InputSplit split : splits)
-      assert (split instanceof BatchInputSplit);
-
-    // We should divide along the tablet lines similar to when using `setAutoAdjustRanges(job, true)`
-    assertEquals(2, splits.size());
-  }
-
-  private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-    BatchWriter bw = getConnector().createBatchWriter(tableName, null);
-
-    for (int i = 0; i < 10000; i++) {
-      String row = String.format("%09d", i);
-
-      Mutation m = new Mutation(new Text(row));
-      m.put(new Text("cf1"), new Text("cq1"), ts, new Value(("" + i).getBytes()));
-      bw.addMutation(m);
-    }
-    bw.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
index 9e3e8b6..4652c33 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
@@ -27,18 +27,27 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Merge;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.util.TabletIterator;
+import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
 import org.apache.hadoop.io.Text;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class MergeIT extends AccumuloClusterHarness {
 
@@ -191,4 +200,73 @@ public class MergeIT extends AccumuloClusterHarness {
 
   }
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private static class TestTabletIterator extends TabletIterator {
+
+    private final Connector conn;
+    private final String metadataTableName;
+
+    public TestTabletIterator(Connector conn, String metadataTableName) throws Exception {
+      super(conn.createScanner(metadataTableName, Authorizations.EMPTY), MetadataSchema.TabletsSection.getRange(), true, true);
+      this.conn = conn;
+      this.metadataTableName = metadataTableName;
+    }
+
+    @Override
+    protected void resetScanner() {
+      try {
+        Scanner ds = conn.createScanner(metadataTableName, Authorizations.EMPTY);
+        Text tablet = new KeyExtent(new Text("0"), new Text("m"), null).getMetadataEntry();
+        ds.setRange(new Range(tablet, true, tablet, true));
+
+        Mutation m = new Mutation(tablet);
+
+        BatchWriter bw = conn.createBatchWriter(metadataTableName, new BatchWriterConfig());
+        for (Entry<Key,Value> entry : ds) {
+          Key k = entry.getKey();
+          m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp());
+        }
+
+        bw.addMutation(m);
+
+        bw.close();
+
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      super.resetScanner();
+    }
+
+  }
+
+  // simulate a merge happening while iterating over tablets
+  @Test
+  public void testMerge() throws Exception {
+    // create a fake metadata table
+    String metadataTableName = getUniqueNames(1)[0];
+    getConnector().tableOperations().create(metadataTableName);
+
+    KeyExtent ke1 = new KeyExtent(new Text("0"), new Text("m"), null);
+    Mutation mut1 = ke1.getPrevRowUpdateMutation();
+    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut1, new Value("/d1".getBytes()));
+
+    KeyExtent ke2 = new KeyExtent(new Text("0"), null, null);
+    Mutation mut2 = ke2.getPrevRowUpdateMutation();
+    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut2, new Value("/d2".getBytes()));
+
+    BatchWriter bw1 = getConnector().createBatchWriter(metadataTableName, new BatchWriterConfig());
+    bw1.addMutation(mut1);
+    bw1.addMutation(mut2);
+    bw1.close();
+
+    TestTabletIterator tabIter = new TestTabletIterator(getConnector(), metadataTableName);
+
+    exception.expect(TabletDeletedException.class);
+    while (tabIter.hasNext()) {
+      tabIter.next();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java b/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java
new file mode 100644
index 0000000..6475e11
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java
@@ -0,0 +1,184 @@
+package org.apache.accumulo.test.gc.replication;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class CloseWriteAheadLogReferencesIT extends ConfigurableMacBase {
+
+  private WrappedCloseWriteAheadLogReferences refs;
+  private Connector conn;
+
+  private static class WrappedCloseWriteAheadLogReferences extends CloseWriteAheadLogReferences {
+    public WrappedCloseWriteAheadLogReferences(AccumuloServerContext context) {
+      super(context);
+    }
+
+    @Override
+    protected long updateReplicationEntries(Connector conn, Set<String> closedWals) {
+      return super.updateReplicationEntries(conn, closedWals);
+    }
+  }
+
+  @Before
+  public void setupInstance() throws Exception {
+    conn = getConnector();
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+    ReplicationTable.setOnline(conn);
+  }
+
+  @Before
+  public void setupEasyMockStuff() {
+    Instance mockInst = createMock(Instance.class);
+    SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
+    expect(mockInst.getInstanceID()).andReturn(testName.getMethodName()).anyTimes();
+    expect(mockInst.getZooKeepers()).andReturn("localhost").anyTimes();
+    expect(mockInst.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+    final AccumuloConfiguration systemConf = new ConfigurationCopy(new HashMap<String,String>());
+    ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
+    expect(factory.getConfiguration()).andReturn(systemConf).anyTimes();
+    expect(factory.getInstance()).andReturn(mockInst).anyTimes();
+    expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+    // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+    // Presently, we only need get(Property) and iterator().
+    EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+      @Override
+      public String answer() {
+        Object[] args = EasyMock.getCurrentArguments();
+        return systemConf.get((Property) args[0]);
+      }
+    }).anyTimes();
+    EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
+      @Override
+      public Boolean answer() {
+        Object[] args = EasyMock.getCurrentArguments();
+        return systemConf.getBoolean((Property) args[0]);
+      }
+    }).anyTimes();
+
+    EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+      @Override
+      public Iterator<Entry<String,String>> answer() {
+        return systemConf.iterator();
+      }
+    }).anyTimes();
+
+    replay(mockInst, factory, siteConfig);
+    refs = new WrappedCloseWriteAheadLogReferences(new AccumuloServerContext(factory));
+  }
+
+  @Test
+  public void unclosedWalsLeaveStatusOpen() throws Exception {
+    Set<String> wals = Collections.emptySet();
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wal/tserver+port/12345");
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+    bw.addMutation(m);
+    bw.close();
+
+    refs.updateReplicationEntries(conn, wals);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    Status status = Status.parseFrom(entry.getValue().get());
+    Assert.assertFalse(status.getClosed());
+  }
+
+  @Test
+  public void closedWalsUpdateStatus() throws Exception {
+    String file = "file:/accumulo/wal/tserver+port/12345";
+    Set<String> wals = Collections.singleton(file);
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+    bw.addMutation(m);
+    bw.close();
+
+    refs.updateReplicationEntries(conn, wals);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    Status status = Status.parseFrom(entry.getValue().get());
+    Assert.assertTrue(status.getClosed());
+  }
+
+  @Test
+  public void partiallyReplicatedReferencedWalsAreNotClosed() throws Exception {
+    String file = "file:/accumulo/wal/tserver+port/12345";
+    Set<String> wals = Collections.singleton(file);
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    Mutation m = new Mutation(file);
+    StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.ingestedUntil(1000)));
+    bw.addMutation(m);
+    bw.close();
+
+    refs.updateReplicationEntries(conn, wals);
+
+    Scanner s = ReplicationTable.getScanner(conn);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    Status status = Status.parseFrom(entry.getValue().get());
+    Assert.assertFalse(status.getClosed());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000..7a4223d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapred.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(AccumuloFileOutputFormatIT.class);
+  private static final int JOB_VISIBILITY_CACHE_SIZE = 3000;
+  private static final String PREFIX = AccumuloFileOutputFormatIT.class.getSimpleName();
+  private static final String BAD_TABLE = PREFIX + "_mapred_bad_table";
+  private static final String TEST_TABLE = PREFIX + "_mapred_test_table";
+  private static final String EMPTY_TABLE = PREFIX + "_mapred_empty_table";
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testEmptyWrite() throws Exception {
+    getConnector().tableOperations().create(EMPTY_TABLE);
+    handleWriteTests(false);
+  }
+
+  @Test
+  public void testRealWrite() throws Exception {
+    Connector c = getConnector();
+    c.tableOperations().create(TEST_TABLE);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+    Mutation m = new Mutation("Key");
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
+    handleWriteTests(true);
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class BadKeyMapper implements Mapper<Key,Value,Key,Value> {
+
+      int index = 0;
+
+      @Override
+      public void map(Key key, Value value, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+        try {
+          try {
+            output.collect(key, value);
+            if (index == 2)
+              fail();
+          } catch (Exception e) {
+            log.error(e.toString(), e);
+            assertEquals(2, index);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        index++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(2, index);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+      }
+
+      String table = args[0];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+      ConfiguratorBase.setVisibilityCacheSize(job, JOB_VISIBILITY_CACHE_SIZE);
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+      AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloFileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : IdentityMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloFileOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  private void handleWriteTests(boolean content) throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    if (f.delete()) {
+      log.debug("Deleted {}", f);
+    }
+    MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+    assertTrue(f.exists());
+    File[] files = f.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File file) {
+        return file.getName().startsWith("part-m-");
+      }
+    });
+    assertNotNull(files);
+    if (content) {
+      assertEquals(1, files.length);
+      assertTrue(files[0].exists());
+    } else {
+      assertEquals(0, files.length);
+    }
+  }
+
+  @Test
+  public void writeBadVisibility() throws Exception {
+    Connector c = getConnector();
+    c.tableOperations().create(BAD_TABLE);
+    BatchWriter bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+    Mutation m = new Mutation("r1");
+    m.put("cf1", "cq1", "A&B");
+    m.put("cf1", "cq1", "A&B");
+    m.put("cf1", "cq2", "A&");
+    bw.addMutation(m);
+    bw.close();
+    File f = folder.newFile(testName.getMethodName());
+    if (f.delete()) {
+      log.debug("Deleted {}", f);
+    }
+    MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+    assertNull(e1);
+    assertNull(e2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
new file mode 100644
index 0000000..2cef382
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+  @BeforeClass
+  public static void setupClass() {
+    System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+  }
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 1) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table>");
+      }
+
+      String table = args[0];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String... args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    String table = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    c.tableOperations().create(table);
+    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    MRTester.main(table);
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  @Test
+  public void testCorrectRangeInputSplits() throws Exception {
+    JobConf job = new JobConf();
+
+    String table = getUniqueNames(1)[0];
+    Authorizations auths = new Authorizations("foo");
+    Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
+    boolean isolated = true, localIters = true;
+    Level level = Level.WARN;
+
+    Connector connector = getConnector();
+    connector.tableOperations().create(table);
+
+    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+    AccumuloInputFormat.setInputTableName(job, table);
+    AccumuloInputFormat.setScanAuthorizations(job, auths);
+    AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+    AccumuloInputFormat.setScanIsolation(job, isolated);
+    AccumuloInputFormat.setLocalIterators(job, localIters);
+    AccumuloInputFormat.fetchColumns(job, fetchColumns);
+    AccumuloInputFormat.setLogLevel(job, level);
+
+    AccumuloInputFormat aif = new AccumuloInputFormat();
+
+    InputSplit[] splits = aif.getSplits(job, 1);
+
+    Assert.assertEquals(1, splits.length);
+
+    InputSplit split = splits[0];
+
+    Assert.assertEquals(RangeInputSplit.class, split.getClass());
+
+    RangeInputSplit risplit = (RangeInputSplit) split;
+
+    Assert.assertEquals(getAdminPrincipal(), risplit.getPrincipal());
+    Assert.assertEquals(table, risplit.getTableName());
+    Assert.assertEquals(getAdminToken(), risplit.getToken());
+    Assert.assertEquals(auths, risplit.getAuths());
+    Assert.assertEquals(getConnector().getInstance().getInstanceName(), risplit.getInstanceName());
+    Assert.assertEquals(isolated, risplit.isIsolatedScan());
+    Assert.assertEquals(localIters, risplit.usesLocalIterators());
+    Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
+    Assert.assertEquals(level, risplit.getLogLevel());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java
new file mode 100644
index 0000000..35ba7bc
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloMultiTableInputFormat;
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloMultiTableInputFormatIT extends AccumuloClusterHarness {
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+        try {
+          String tableName = ((RangeInputSplit) reporter.getInputSplit()).getTableName();
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table1> <table2>");
+      }
+
+      String user = getAdminPrincipal();
+      AuthenticationToken pass = getAdminToken();
+      String table1 = args[0];
+      String table2 = args[1];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloMultiTableInputFormat.setConnectorInfo(job, user, pass);
+      AccumuloMultiTableInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      InputTableConfig tableConfig1 = new InputTableConfig();
+      InputTableConfig tableConfig2 = new InputTableConfig();
+
+      Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
+      configMap.put(table1, tableConfig1);
+      configMap.put(table2, tableConfig2);
+
+      AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    Connector c = getConnector();
+    c.tableOperations().create(table1);
+    c.tableOperations().create(table2);
+    BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+    BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1)));
+      t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes()));
+      bw.addMutation(t1m);
+      Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1)));
+      t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes()));
+      bw2.addMutation(t2m);
+    }
+    bw.close();
+    bw2.close();
+
+    MRTester.main(new String[] {table1, table2});
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+}