You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/07/30 23:51:38 UTC
[04/14] accumulo git commit: ACCUMULO-3920 Convert more tests from
mock
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
new file mode 100644
index 0000000..11fb6b0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClientSideIteratorIT extends AccumuloClusterHarness {
+ private List<Key> resultSet1;
+ private List<Key> resultSet2;
+ private List<Key> resultSet3;
+
+ @Before
+ public void setupData() {
+ resultSet1 = new ArrayList<Key>();
+ resultSet1.add(new Key("row1", "colf", "colq", 4l));
+ resultSet1.add(new Key("row1", "colf", "colq", 3l));
+ resultSet2 = new ArrayList<Key>();
+ resultSet2.add(new Key("row1", "colf", "colq", 4l));
+ resultSet2.add(new Key("row1", "colf", "colq", 3l));
+ resultSet2.add(new Key("row1", "colf", "colq", 2l));
+ resultSet2.add(new Key("row1", "colf", "colq", 1l));
+ resultSet3 = new ArrayList<Key>();
+ resultSet3.add(new Key("part1", "", "doc2"));
+ resultSet3.add(new Key("part2", "", "DOC2"));
+ }
+
+ private void checkResults(final Iterable<Entry<Key,Value>> scanner, final List<Key> results, final PartialKey pk) {
+ int i = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ assertTrue(entry.getKey().equals(results.get(i++), pk));
+ }
+ assertEquals(i, results.size());
+ }
+
+ private Connector conn;
+ private String tableName;
+
+ @Before
+ public void setupInstance() throws Exception {
+ conn = getConnector();
+ tableName = getUniqueNames(1)[0];
+ }
+
+ @Test
+ public void testIntersect() throws Exception {
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("part1");
+ m.put("bar", "doc1", "value");
+ m.put("bar", "doc2", "value");
+ m.put("dog", "doc3", "value");
+ m.put("foo", "doc2", "value");
+ m.put("foo", "doc3", "value");
+ bw.addMutation(m);
+ m = new Mutation("part2");
+ m.put("bar", "DOC1", "value");
+ m.put("bar", "DOC2", "value");
+ m.put("dog", "DOC3", "value");
+ m.put("foo", "DOC2", "value");
+ m.put("foo", "DOC3", "value");
+ bw.addMutation(m);
+ bw.flush();
+
+ final ClientSideIteratorScanner csis = new ClientSideIteratorScanner(conn.createScanner(tableName, new Authorizations()));
+ final IteratorSetting si = new IteratorSetting(10, tableName, IntersectingIterator.class);
+ IntersectingIterator.setColumnFamilies(si, new Text[] {new Text("bar"), new Text("foo")});
+ csis.addScanIterator(si);
+
+ checkResults(csis, resultSet3, PartialKey.ROW_COLFAM_COLQUAL);
+ }
+
+ @Test
+ public void testVersioning() throws Exception {
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().removeProperty(tableName, "table.iterator.scan.vers");
+ conn.tableOperations().removeProperty(tableName, "table.iterator.majc.vers");
+ conn.tableOperations().removeProperty(tableName, "table.iterator.minc.vers");
+ final BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("colf", "colq", 1l, "value");
+ m.put("colf", "colq", 2l, "value");
+ bw.addMutation(m);
+ bw.flush();
+ m = new Mutation("row1");
+ m.put("colf", "colq", 3l, "value");
+ m.put("colf", "colq", 4l, "value");
+ bw.addMutation(m);
+ bw.flush();
+
+ final Scanner scanner = conn.createScanner(tableName, new Authorizations());
+
+ final ClientSideIteratorScanner csis = new ClientSideIteratorScanner(scanner);
+ final IteratorSetting si = new IteratorSetting(10, "localvers", VersioningIterator.class);
+ si.addOption("maxVersions", "2");
+ csis.addScanIterator(si);
+
+ checkResults(csis, resultSet1, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME);
+ checkResults(scanner, resultSet2, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME);
+
+ csis.fetchColumnFamily(new Text("colf"));
+ checkResults(csis, resultSet1, PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME);
+ csis.clearColumns();
+ csis.fetchColumnFamily(new Text("none"));
+ assertFalse(csis.iterator().hasNext());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index 621620f..249d88f 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -34,6 +34,7 @@ import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -62,6 +63,7 @@ import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.format.Formatter;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.test.functional.SlowIterator;
@@ -86,6 +88,7 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
public class ShellServerIT extends SharedMiniClusterBase {
@@ -1083,6 +1086,115 @@ public class ShellServerIT extends SharedMiniClusterBase {
}
@Test
+ public void formatter() throws Exception {
+ ts.exec("createtable formatter_test", true);
+ ts.exec("table formatter_test", true);
+ ts.exec("insert row cf cq 1234abcd", true);
+ ts.exec("insert row cf1 cq1 9876fedc", true);
+ ts.exec("insert row2 cf cq 13579bdf", true);
+ ts.exec("insert row2 cf1 cq 2468ace", true);
+
+ ArrayList<String> expectedDefault = new ArrayList<>(4);
+ expectedDefault.add("row cf:cq [] 1234abcd");
+ expectedDefault.add("row cf1:cq1 [] 9876fedc");
+ expectedDefault.add("row2 cf:cq [] 13579bdf");
+ expectedDefault.add("row2 cf1:cq [] 2468ace");
+ ArrayList<String> actualDefault = new ArrayList<>(4);
+ boolean isFirst = true;
+ for (String s : ts.exec("scan -np", true).split("[\n\r]+")) {
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ actualDefault.add(s);
+ }
+ }
+
+ ArrayList<String> expectedFormatted = new ArrayList<>(4);
+ expectedFormatted.add("row cf:cq [] 0x31 0x32 0x33 0x34 0x61 0x62 0x63 0x64");
+ expectedFormatted.add("row cf1:cq1 [] 0x39 0x38 0x37 0x36 0x66 0x65 0x64 0x63");
+ expectedFormatted.add("row2 cf:cq [] 0x31 0x33 0x35 0x37 0x39 0x62 0x64 0x66");
+ expectedFormatted.add("row2 cf1:cq [] 0x32 0x34 0x36 0x38 0x61 0x63 0x65");
+ ts.exec("formatter -t formatter_test -f " + HexFormatter.class.getName(), true);
+ ArrayList<String> actualFormatted = new ArrayList<>(4);
+ isFirst = true;
+ for (String s : ts.exec("scan -np", true).split("[\n\r]+")) {
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ actualFormatted.add(s);
+ }
+ }
+
+ ts.exec("deletetable -f formatter_test", true);
+
+ assertTrue(Iterables.elementsEqual(expectedDefault, new ArrayList<String>(actualDefault)));
+ assertTrue(Iterables.elementsEqual(expectedFormatted, new ArrayList<String>(actualFormatted)));
+ }
+
+ /**
+ * <p>
+ * Simple <code>Formatter</code> that will convert each character in the Value from decimal to hexadecimal. Will automatically skip over characters in the
+ * value which do not fall within the [0-9,a-f] range.
+ * </p>
+ *
+ * <p>
+ * Example: <code>'0'</code> will be displayed as <code>'0x30'</code>
+ * </p>
+ */
+ public static class HexFormatter implements Formatter {
+ private Iterator<Entry<Key,Value>> iter = null;
+ private boolean printTs = false;
+
+ private final static String tab = "\t";
+ private final static String newline = "\n";
+
+ public HexFormatter() {}
+
+ @Override
+ public boolean hasNext() {
+ return this.iter.hasNext();
+ }
+
+ @Override
+ public String next() {
+ final Entry<Key,Value> entry = iter.next();
+
+ String key;
+
+ // Observe the timestamps
+ if (printTs) {
+ key = entry.getKey().toString();
+ } else {
+ key = entry.getKey().toStringNoTime();
+ }
+
+ final Value v = entry.getValue();
+
+ // Approximate how much space we'll need
+ final StringBuilder sb = new StringBuilder(key.length() + v.getSize() * 5);
+
+ sb.append(key).append(tab);
+
+ for (byte b : v.get()) {
+ if ((b >= 48 && b <= 57) || (b >= 97 && b <= 102)) {
+ sb.append(String.format("0x%x ", Integer.valueOf(b)));
+ }
+ }
+
+ return sb.toString().trim() + newline;
+ }
+
+ @Override
+ public void remove() {}
+
+ @Override
+ public void initialize(final Iterable<Entry<Key,Value>> scanner, final boolean printTimestamps) {
+ this.iter = scanner.iterator();
+ this.printTs = printTimestamps;
+ }
+ }
+
+ @Test
public void extensions() throws Exception {
String extName = "ExampleShellExtension";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java b/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java
new file mode 100644
index 0000000..8853733
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/examples/simple/dirlist/CountIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.examples.simple.dirlist;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.examples.simple.dirlist.FileCount;
+import org.apache.accumulo.examples.simple.dirlist.FileCount.Opts;
+import org.apache.accumulo.examples.simple.dirlist.Ingest;
+import org.apache.accumulo.examples.simple.dirlist.QueryUtil;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CountIT extends ConfigurableMacBase {
+
+ private Connector conn;
+ private String tableName;
+
+ @Before
+ public void setupInstance() throws Exception {
+ tableName = getUniqueNames(1)[0];
+ conn = getConnector();
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ ColumnVisibility cv = new ColumnVisibility();
+ // / has 1 dir
+ // /local has 2 dirs 1 file
+ // /local/user1 has 2 files
+ bw.addMutation(Ingest.buildMutation(cv, "/local", true, false, true, 272, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user1", true, false, true, 272, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user2", true, false, true, 272, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 23456, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file1", false, false, false, 2024, 12345, null));
+ bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file2", false, false, false, 1028, 23456, null));
+ bw.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ Scanner scanner = conn.createScanner(tableName, new Authorizations());
+ scanner.fetchColumn(new Text("dir"), new Text("counts"));
+ assertFalse(scanner.iterator().hasNext());
+
+ Opts opts = new Opts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.instance = conn.getInstance().getInstanceName();
+ opts.zookeepers = conn.getInstance().getZooKeepers();
+ opts.setTableName(tableName);
+ opts.setPrincipal(conn.whoami());
+ opts.setPassword(new Opts.Password(ROOT_PASSWORD));
+ FileCount fc = new FileCount(opts, scanOpts, bwOpts);
+ fc.run();
+
+ ArrayList<Pair<String,String>> expected = new ArrayList<Pair<String,String>>();
+ expected.add(new Pair<String,String>(QueryUtil.getRow("").toString(), "1,0,3,3"));
+ expected.add(new Pair<String,String>(QueryUtil.getRow("/local").toString(), "2,1,2,3"));
+ expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user1").toString(), "0,2,0,2"));
+ expected.add(new Pair<String,String>(QueryUtil.getRow("/local/user2").toString(), "0,0,0,0"));
+
+ int i = 0;
+ for (Entry<Key,Value> e : scanner) {
+ assertEquals(e.getKey().getRow().toString(), expected.get(i).getFirst());
+ assertEquals(e.getValue().toString(), expected.get(i).getSecond());
+ i++;
+ }
+ assertEquals(i, expected.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java
new file mode 100644
index 0000000..b2b6aff
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputFormatIT.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.examples.simple.filedata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class ChunkInputFormatIT extends AccumuloClusterHarness {
+
+ // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to ensure test correctness),
+ // so error tests should check to see if there is at least one error (could be more depending on the test) rather than zero
+ private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+ private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
+
+ private static List<Entry<Key,Value>> data;
+ private static List<Entry<Key,Value>> baddata;
+
+ private Connector conn;
+ private String tableName;
+
+ @Before
+ public void setupInstance() throws Exception {
+ conn = getConnector();
+ tableName = getUniqueNames(1)[0];
+ conn.securityOperations().changeUserAuthorizations(conn.whoami(), AUTHS);
+ }
+
+ @BeforeClass
+ public static void setupClass() {
+ System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+
+ data = new ArrayList<Entry<Key,Value>>();
+ ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
+ ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name");
+ ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", "");
+ ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
+ ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name");
+ ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+ ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+ ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", "");
+ ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", "");
+ ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", "");
+ baddata = new ArrayList<Entry<Key,Value>>();
+ ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
+ ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
+ }
+
+ public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
+ assertEquals(e1.getKey(), e2.getKey());
+ assertEquals(e1.getValue(), e2.getValue());
+ }
+
+ public static class CIFTester extends Configured implements Tool {
+ public static class TestMapper extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+ int count = 0;
+
+ @Override
+ protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+ String table = context.getConfiguration().get("MRTester_tableName");
+ assertNotNull(table);
+
+ byte[] b = new byte[20];
+ int read;
+ try {
+ switch (count) {
+ case 0:
+ assertEquals(key.size(), 2);
+ entryEquals(key.get(0), data.get(0));
+ entryEquals(key.get(1), data.get(1));
+ assertEquals(read = value.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = value.read(b), -1);
+ break;
+ case 1:
+ assertEquals(key.size(), 2);
+ entryEquals(key.get(0), data.get(4));
+ entryEquals(key.get(1), data.get(5));
+ assertEquals(read = value.read(b), 10);
+ assertEquals(new String(b, 0, read), "qwertyuiop");
+ assertEquals(read = value.read(b), -1);
+ break;
+ default:
+ fail();
+ }
+ } catch (AssertionError e) {
+ assertionErrors.put(table, e);
+ } finally {
+ value.close();
+ }
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ String table = context.getConfiguration().get("MRTester_tableName");
+ assertNotNull(table);
+
+ try {
+ assertEquals(2, count);
+ } catch (AssertionError e) {
+ assertionErrors.put(table, e);
+ }
+ }
+ }
+
+ public static class TestNoClose extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+ int count = 0;
+
+ @Override
+ protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+ String table = context.getConfiguration().get("MRTester_tableName");
+ assertNotNull(table);
+
+ byte[] b = new byte[5];
+ int read;
+ try {
+ switch (count) {
+ case 0:
+ assertEquals(read = value.read(b), 5);
+ assertEquals(new String(b, 0, read), "asdfj");
+ break;
+ default:
+ fail();
+ }
+ } catch (AssertionError e) {
+ assertionErrors.put(table, e);
+ }
+ count++;
+ try {
+ context.nextKeyValue();
+ fail();
+ } catch (IOException ioe) {
+ assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe));
+ }
+ }
+ }
+
+ public static class TestBadData extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+ @Override
+ protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+ String table = context.getConfiguration().get("MRTester_tableName");
+ assertNotNull(table);
+
+ byte[] b = new byte[20];
+ try {
+ assertEquals(key.size(), 2);
+ entryEquals(key.get(0), baddata.get(0));
+ entryEquals(key.get(1), baddata.get(1));
+ } catch (AssertionError e) {
+ assertionErrors.put(table, e);
+ }
+ try {
+ assertFalse(value.read(b) > 0);
+ try {
+ fail();
+ } catch (AssertionError e) {
+ assertionErrors.put(table, e);
+ }
+ } catch (Exception e) {
+ // expected, ignore
+ }
+ try {
+ value.close();
+ try {
+ fail();
+ } catch (AssertionError e) {
+ assertionErrors.put(table, e);
+ }
+ } catch (Exception e) {
+ // expected, ignore
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ throw new IllegalArgumentException("Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
+ }
+
+ String table = args[0];
+ assertionErrors.put(table, new AssertionError("Dummy"));
+ assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
+ getConf().set("MRTester_tableName", table);
+
+ Job job = Job.getInstance(getConf());
+ job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(ChunkInputFormat.class);
+
+ ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+ ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+ ChunkInputFormat.setInputTableName(job, table);
+ ChunkInputFormat.setScanAuthorizations(job, AUTHS);
+
+ @SuppressWarnings("unchecked")
+ Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class.forName(args[1]);
+ job.setMapperClass(forName);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static int main(String... args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ return ToolRunner.run(conf, new CIFTester(), args);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+ for (Entry<Key,Value> e : data) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName()));
+ assertEquals(1, assertionErrors.get(tableName).size());
+ }
+
+ @Test
+ public void testErrorOnNextWithoutClose() throws Exception {
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+ for (Entry<Key,Value> e : data) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName()));
+ assertEquals(1, assertionErrors.get(tableName).size());
+ // this should actually exist, in addition to the dummy entry
+ assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size());
+ }
+
+ @Test
+ public void testInfoWithoutChunks() throws Exception {
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ for (Entry<Key,Value> e : baddata) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName()));
+ assertEquals(1, assertionErrors.get(tableName).size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java
new file mode 100644
index 0000000..47db6dd
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/examples/simple/filedata/ChunkInputStreamIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.examples.simple.filedata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.examples.simple.filedata.ChunkInputStream;
+import org.apache.accumulo.examples.simple.filedata.FileDataIngest;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChunkInputStreamIT extends AccumuloClusterHarness {
+
+ private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
+
+ private Connector conn;
+ private String tableName;
+ private List<Entry<Key,Value>> data;
+ private List<Entry<Key,Value>> baddata;
+ private List<Entry<Key,Value>> multidata;
+
+ @Before
+ public void setupInstance() throws Exception {
+ conn = getConnector();
+ tableName = getUniqueNames(1)[0];
+ conn.securityOperations().changeUserAuthorizations(conn.whoami(), AUTHS);
+ }
+
+ @Before
+ public void setupData() {
+ data = new ArrayList<Entry<Key,Value>>();
+ addData(data, "a", "refs", "id\0ext", "A&B", "ext");
+ addData(data, "a", "refs", "id\0name", "A&B", "name");
+ addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(data, "a", "~chunk", 100, 1, "A&B", "");
+ addData(data, "b", "refs", "id\0ext", "A&B", "ext");
+ addData(data, "b", "refs", "id\0name", "A&B", "name");
+ addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+ addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+ addData(data, "b", "~chunk", 100, 1, "A&B", "");
+ addData(data, "b", "~chunk", 100, 1, "B&C", "");
+ addData(data, "b", "~chunk", 100, 1, "D", "");
+ addData(data, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(data, "c", "~chunk", 100, 1, "A&B", "asdfjkl;");
+ addData(data, "c", "~chunk", 100, 2, "A&B", "");
+ addData(data, "d", "~chunk", 100, 0, "A&B", "");
+ addData(data, "e", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(data, "e", "~chunk", 100, 1, "A&B", "");
+ baddata = new ArrayList<Entry<Key,Value>>();
+ addData(baddata, "a", "~chunk", 100, 0, "A", "asdfjkl;");
+ addData(baddata, "b", "~chunk", 100, 0, "B", "asdfjkl;");
+ addData(baddata, "b", "~chunk", 100, 2, "C", "");
+ addData(baddata, "c", "~chunk", 100, 0, "D", "asdfjkl;");
+ addData(baddata, "c", "~chunk", 100, 2, "E", "");
+ addData(baddata, "d", "~chunk", 100, 0, "F", "asdfjkl;");
+ addData(baddata, "d", "~chunk", 100, 1, "G", "");
+ addData(baddata, "d", "~zzzzz", "colq", "H", "");
+ addData(baddata, "e", "~chunk", 100, 0, "I", "asdfjkl;");
+ addData(baddata, "e", "~chunk", 100, 1, "J", "");
+ addData(baddata, "e", "~chunk", 100, 2, "I", "asdfjkl;");
+ addData(baddata, "f", "~chunk", 100, 2, "K", "asdfjkl;");
+ addData(baddata, "g", "~chunk", 100, 0, "L", "");
+ multidata = new ArrayList<Entry<Key,Value>>();
+ addData(multidata, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(multidata, "a", "~chunk", 100, 1, "A&B", "");
+ addData(multidata, "a", "~chunk", 200, 0, "B&C", "asdfjkl;");
+ addData(multidata, "b", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(multidata, "b", "~chunk", 200, 0, "B&C", "asdfjkl;");
+ addData(multidata, "b", "~chunk", 200, 1, "B&C", "asdfjkl;");
+ addData(multidata, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+ addData(multidata, "c", "~chunk", 100, 1, "B&C", "");
+ }
+
+ static void addData(List<Entry<Key,Value>> data, String row, String cf, String cq, String vis, String value) {
+ data.add(new KeyValue(new Key(new Text(row), new Text(cf), new Text(cq), new Text(vis)), value.getBytes()));
+ }
+
+ static void addData(List<Entry<Key,Value>> data, String row, String cf, int chunkSize, int chunkCount, String vis, String value) {
+ Text chunkCQ = new Text(FileDataIngest.intToBytes(chunkSize));
+ chunkCQ.append(FileDataIngest.intToBytes(chunkCount), 0, 4);
+ data.add(new KeyValue(new Key(new Text(row), new Text(cf), chunkCQ, new Text(vis)), value.getBytes()));
+ }
+
+ @Test
+ public void testWithAccumulo() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, IOException {
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+ for (Entry<Key,Value> e : data) {
+ Key k = e.getKey();
+ Mutation m = new Mutation(k.getRow());
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), e.getValue());
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ Scanner scan = conn.createScanner(tableName, AUTHS);
+
+ ChunkInputStream cis = new ChunkInputStream();
+ byte[] b = new byte[20];
+ int read;
+ PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<Entry<Key,Value>>(scan.iterator());
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 10);
+ assertEquals(new String(b, 0, read), "qwertyuiop");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 16);
+ assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ assertEquals(cis.getVisibilities().toString(), "[A&B]");
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+
+ cis.setSource(pi);
+ assertEquals(read = cis.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = cis.read(b), -1);
+ cis.close();
+
+ assertFalse(pi.hasNext());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
deleted file mode 100644
index 1d38126..0000000
--- a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.functional;
-
-import static java.lang.System.currentTimeMillis;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.Before;
-import org.junit.Test;
-
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
-public class AccumuloInputFormatIT extends AccumuloClusterHarness {
-
- AccumuloInputFormat inputFormat;
-
- @Override
- protected int defaultTimeoutSeconds() {
- return 4 * 60;
- }
-
- @Override
- public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- }
-
- @Before
- public void before() {
- inputFormat = new AccumuloInputFormat();
- }
-
- /**
- * Tests several different paths through the getSplits() method by setting different properties and verifying the results.
- */
- @Test
- public void testGetSplits() throws Exception {
- Connector conn = getConnector();
- String table = getUniqueNames(1)[0];
- conn.tableOperations().create(table);
- insertData(table, currentTimeMillis());
-
- ClientConfiguration clientConf = cluster.getClientConfig();
- AccumuloConfiguration clusterClientConf = new ConfigurationCopy(new DefaultConfiguration());
-
- // Pass SSL and CredentialProvider options into the ClientConfiguration given to AccumuloInputFormat
- boolean sslEnabled = Boolean.valueOf(clusterClientConf.get(Property.INSTANCE_RPC_SSL_ENABLED));
- if (sslEnabled) {
- ClientProperty[] sslProperties = new ClientProperty[] {ClientProperty.INSTANCE_RPC_SSL_ENABLED, ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH,
- ClientProperty.RPC_SSL_KEYSTORE_PATH, ClientProperty.RPC_SSL_KEYSTORE_TYPE, ClientProperty.RPC_SSL_KEYSTORE_PASSWORD,
- ClientProperty.RPC_SSL_TRUSTSTORE_PATH, ClientProperty.RPC_SSL_TRUSTSTORE_TYPE, ClientProperty.RPC_SSL_TRUSTSTORE_PASSWORD,
- ClientProperty.RPC_USE_JSSE, ClientProperty.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS};
-
- for (ClientProperty prop : sslProperties) {
- // The default property is returned if it's not in the ClientConfiguration so we don't have to check if the value is actually defined
- clientConf.setProperty(prop, clusterClientConf.get(prop.getKey()));
- }
- }
-
- Job job = Job.getInstance();
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloInputFormat.setZooKeeperInstance(job, clientConf);
- AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
-
- // split table
- TreeSet<Text> splitsToAdd = new TreeSet<Text>();
- for (int i = 0; i < 10000; i += 1000)
- splitsToAdd.add(new Text(String.format("%09d", i)));
- conn.tableOperations().addSplits(table, splitsToAdd);
- sleepUninterruptibly(500, TimeUnit.MILLISECONDS); // wait for splits to be propagated
-
- // get splits without setting any range
- Collection<Text> actualSplits = conn.tableOperations().listSplits(table);
- List<InputSplit> splits = inputFormat.getSplits(job);
- assertEquals(actualSplits.size() + 1, splits.size()); // No ranges set on the job so it'll start with -inf
-
- // set ranges and get splits
- List<Range> ranges = new ArrayList<Range>();
- for (Text text : actualSplits)
- ranges.add(new Range(text));
- AccumuloInputFormat.setRanges(job, ranges);
- splits = inputFormat.getSplits(job);
- assertEquals(actualSplits.size(), splits.size());
-
- // offline mode
- AccumuloInputFormat.setOfflineTableScan(job, true);
- try {
- inputFormat.getSplits(job);
- fail("An exception should have been thrown");
- } catch (IOException e) {}
-
- conn.tableOperations().offline(table, true);
- splits = inputFormat.getSplits(job);
- assertEquals(actualSplits.size(), splits.size());
-
- // auto adjust ranges
- ranges = new ArrayList<Range>();
- for (int i = 0; i < 5; i++)
- // overlapping ranges
- ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
- AccumuloInputFormat.setRanges(job, ranges);
- splits = inputFormat.getSplits(job);
- assertEquals(2, splits.size());
-
- AccumuloInputFormat.setAutoAdjustRanges(job, false);
- splits = inputFormat.getSplits(job);
- assertEquals(ranges.size(), splits.size());
-
- // BatchScan not available for offline scans
- AccumuloInputFormat.setBatchScan(job, true);
- // Reset auto-adjust ranges too
- AccumuloInputFormat.setAutoAdjustRanges(job, true);
-
- AccumuloInputFormat.setOfflineTableScan(job, true);
- try {
- inputFormat.getSplits(job);
- fail("An exception should have been thrown");
- } catch (IllegalArgumentException e) {}
-
- conn.tableOperations().online(table, true);
- AccumuloInputFormat.setOfflineTableScan(job, false);
-
- // test for resumption of success
- splits = inputFormat.getSplits(job);
- assertEquals(2, splits.size());
-
- // BatchScan not available with isolated iterators
- AccumuloInputFormat.setScanIsolation(job, true);
- try {
- inputFormat.getSplits(job);
- fail("An exception should have been thrown");
- } catch (IllegalArgumentException e) {}
- AccumuloInputFormat.setScanIsolation(job, false);
-
- // test for resumption of success
- splits = inputFormat.getSplits(job);
- assertEquals(2, splits.size());
-
- // BatchScan not available with local iterators
- AccumuloInputFormat.setLocalIterators(job, true);
- try {
- inputFormat.getSplits(job);
- fail("An exception should have been thrown");
- } catch (IllegalArgumentException e) {}
- AccumuloInputFormat.setLocalIterators(job, false);
-
- // Check we are getting back correct type pf split
- conn.tableOperations().online(table);
- splits = inputFormat.getSplits(job);
- for (InputSplit split : splits)
- assert (split instanceof BatchInputSplit);
-
- // We should divide along the tablet lines similar to when using `setAutoAdjustRanges(job, true)`
- assertEquals(2, splits.size());
- }
-
- private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- BatchWriter bw = getConnector().createBatchWriter(tableName, null);
-
- for (int i = 0; i < 10000; i++) {
- String row = String.format("%09d", i);
-
- Mutation m = new Mutation(new Text(row));
- m.put(new Text("cf1"), new Text("cq1"), ts, new Value(("" + i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
index 9e3e8b6..4652c33 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
@@ -27,18 +27,27 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Merge;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.util.TabletIterator;
+import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
import org.apache.hadoop.io.Text;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
public class MergeIT extends AccumuloClusterHarness {
@@ -191,4 +200,73 @@ public class MergeIT extends AccumuloClusterHarness {
}
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ private static class TestTabletIterator extends TabletIterator {
+
+ private final Connector conn;
+ private final String metadataTableName;
+
+ public TestTabletIterator(Connector conn, String metadataTableName) throws Exception {
+ super(conn.createScanner(metadataTableName, Authorizations.EMPTY), MetadataSchema.TabletsSection.getRange(), true, true);
+ this.conn = conn;
+ this.metadataTableName = metadataTableName;
+ }
+
+ @Override
+ protected void resetScanner() {
+ try {
+ Scanner ds = conn.createScanner(metadataTableName, Authorizations.EMPTY);
+ Text tablet = new KeyExtent(new Text("0"), new Text("m"), null).getMetadataEntry();
+ ds.setRange(new Range(tablet, true, tablet, true));
+
+ Mutation m = new Mutation(tablet);
+
+ BatchWriter bw = conn.createBatchWriter(metadataTableName, new BatchWriterConfig());
+ for (Entry<Key,Value> entry : ds) {
+ Key k = entry.getKey();
+ m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp());
+ }
+
+ bw.addMutation(m);
+
+ bw.close();
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ super.resetScanner();
+ }
+
+ }
+
+ // simulate a merge happening while iterating over tablets
+ @Test
+ public void testMerge() throws Exception {
+ // create a fake metadata table
+ String metadataTableName = getUniqueNames(1)[0];
+ getConnector().tableOperations().create(metadataTableName);
+
+ KeyExtent ke1 = new KeyExtent(new Text("0"), new Text("m"), null);
+ Mutation mut1 = ke1.getPrevRowUpdateMutation();
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut1, new Value("/d1".getBytes()));
+
+ KeyExtent ke2 = new KeyExtent(new Text("0"), null, null);
+ Mutation mut2 = ke2.getPrevRowUpdateMutation();
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut2, new Value("/d2".getBytes()));
+
+ BatchWriter bw1 = getConnector().createBatchWriter(metadataTableName, new BatchWriterConfig());
+ bw1.addMutation(mut1);
+ bw1.addMutation(mut2);
+ bw1.close();
+
+ TestTabletIterator tabIter = new TestTabletIterator(getConnector(), metadataTableName);
+
+ exception.expect(TabletDeletedException.class);
+ while (tabIter.hasNext()) {
+ tabIter.next();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java b/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java
new file mode 100644
index 0000000..6475e11
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/gc/replication/CloseWriteAheadLogReferencesIT.java
@@ -0,0 +1,184 @@
+package org.apache.accumulo.test.gc.replication;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class CloseWriteAheadLogReferencesIT extends ConfigurableMacBase {
+
+ private WrappedCloseWriteAheadLogReferences refs;
+ private Connector conn;
+
+ private static class WrappedCloseWriteAheadLogReferences extends CloseWriteAheadLogReferences {
+ public WrappedCloseWriteAheadLogReferences(AccumuloServerContext context) {
+ super(context);
+ }
+
+ @Override
+ protected long updateReplicationEntries(Connector conn, Set<String> closedWals) {
+ return super.updateReplicationEntries(conn, closedWals);
+ }
+ }
+
+ @Before
+ public void setupInstance() throws Exception {
+ conn = getConnector();
+ conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+ conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+ ReplicationTable.setOnline(conn);
+ }
+
+ @Before
+ public void setupEasyMockStuff() {
+ Instance mockInst = createMock(Instance.class);
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
+ expect(mockInst.getInstanceID()).andReturn(testName.getMethodName()).anyTimes();
+ expect(mockInst.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(mockInst.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+ final AccumuloConfiguration systemConf = new ConfigurationCopy(new HashMap<String,String>());
+ ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
+ expect(factory.getConfiguration()).andReturn(systemConf).anyTimes();
+ expect(factory.getInstance()).andReturn(mockInst).anyTimes();
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConf.get((Property) args[0]);
+ }
+ }).anyTimes();
+ EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
+ @Override
+ public Boolean answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConf.getBoolean((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConf.iterator();
+ }
+ }).anyTimes();
+
+ replay(mockInst, factory, siteConfig);
+ refs = new WrappedCloseWriteAheadLogReferences(new AccumuloServerContext(factory));
+ }
+
+ @Test
+ public void unclosedWalsLeaveStatusOpen() throws Exception {
+ Set<String> wals = Collections.emptySet();
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wal/tserver+port/12345");
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+ bw.addMutation(m);
+ bw.close();
+
+ refs.updateReplicationEntries(conn, wals);
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ Status status = Status.parseFrom(entry.getValue().get());
+ Assert.assertFalse(status.getClosed());
+ }
+
+ @Test
+ public void closedWalsUpdateStatus() throws Exception {
+ String file = "file:/accumulo/wal/tserver+port/12345";
+ Set<String> wals = Collections.singleton(file);
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+ bw.addMutation(m);
+ bw.close();
+
+ refs.updateReplicationEntries(conn, wals);
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(ReplicationSection.COLF);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ Status status = Status.parseFrom(entry.getValue().get());
+ Assert.assertTrue(status.getClosed());
+ }
+
+ @Test
+ public void partiallyReplicatedReferencedWalsAreNotClosed() throws Exception {
+ String file = "file:/accumulo/wal/tserver+port/12345";
+ Set<String> wals = Collections.singleton(file);
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ Mutation m = new Mutation(file);
+ StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.ingestedUntil(1000)));
+ bw.addMutation(m);
+ bw.close();
+
+ refs.updateReplicationEntries(conn, wals);
+
+ Scanner s = ReplicationTable.getScanner(conn);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ Status status = Status.parseFrom(entry.getValue().get());
+ Assert.assertFalse(status.getClosed());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000..7a4223d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapred.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloFileOutputFormatIT.class);
+ private static final int JOB_VISIBILITY_CACHE_SIZE = 3000;
+ private static final String PREFIX = AccumuloFileOutputFormatIT.class.getSimpleName();
+ private static final String BAD_TABLE = PREFIX + "_mapred_bad_table";
+ private static final String TEST_TABLE = PREFIX + "_mapred_test_table";
+ private static final String EMPTY_TABLE = PREFIX + "_mapred_empty_table";
+
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+ @Test
+ public void testEmptyWrite() throws Exception {
+ getConnector().tableOperations().create(EMPTY_TABLE);
+ handleWriteTests(false);
+ }
+
+ @Test
+ public void testRealWrite() throws Exception {
+ Connector c = getConnector();
+ c.tableOperations().create(TEST_TABLE);
+ BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+ Mutation m = new Mutation("Key");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+ handleWriteTests(true);
+ }
+
+ private static class MRTester extends Configured implements Tool {
+ private static class BadKeyMapper implements Mapper<Key,Value,Key,Value> {
+
+ int index = 0;
+
+ @Override
+ public void map(Key key, Value value, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+ try {
+ try {
+ output.collect(key, value);
+ if (index == 2)
+ fail();
+ } catch (Exception e) {
+ log.error(e.toString(), e);
+ assertEquals(2, index);
+ }
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ index++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ assertEquals(2, index);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+ }
+
+ String table = args[0];
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+ ConfiguratorBase.setVisibilityCacheSize(job, JOB_VISIBILITY_CACHE_SIZE);
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+ AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloFileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : IdentityMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(AccumuloFileOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ private void handleWriteTests(boolean content) throws Exception {
+ File f = folder.newFile(testName.getMethodName());
+ if (f.delete()) {
+ log.debug("Deleted {}", f);
+ }
+ MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+ assertTrue(f.exists());
+ File[] files = f.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.getName().startsWith("part-m-");
+ }
+ });
+ assertNotNull(files);
+ if (content) {
+ assertEquals(1, files.length);
+ assertTrue(files[0].exists());
+ } else {
+ assertEquals(0, files.length);
+ }
+ }
+
+ @Test
+ public void writeBadVisibility() throws Exception {
+ Connector c = getConnector();
+ c.tableOperations().create(BAD_TABLE);
+ BatchWriter bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+ Mutation m = new Mutation("r1");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq2", "A&");
+ bw.addMutation(m);
+ bw.close();
+ File f = folder.newFile(testName.getMethodName());
+ if (f.delete()) {
+ log.debug("Deleted {}", f);
+ }
+ MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+ assertNull(e1);
+ assertNull(e2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
new file mode 100644
index 0000000..2cef382
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+ @BeforeClass
+ public static void setupClass() {
+ System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+ }
+
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ assertEquals(100, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 1) {
+ throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table>");
+ }
+
+ String table = args[0];
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String... args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ String table = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(table);
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ MRTester.main(table);
+ assertNull(e1);
+ assertNull(e2);
+ }
+
+ @Test
+ public void testCorrectRangeInputSplits() throws Exception {
+ JobConf job = new JobConf();
+
+ String table = getUniqueNames(1)[0];
+ Authorizations auths = new Authorizations("foo");
+ Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
+ boolean isolated = true, localIters = true;
+ Level level = Level.WARN;
+
+ Connector connector = getConnector();
+ connector.tableOperations().create(table);
+
+ AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setScanAuthorizations(job, auths);
+ AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+ AccumuloInputFormat.setScanIsolation(job, isolated);
+ AccumuloInputFormat.setLocalIterators(job, localIters);
+ AccumuloInputFormat.fetchColumns(job, fetchColumns);
+ AccumuloInputFormat.setLogLevel(job, level);
+
+ AccumuloInputFormat aif = new AccumuloInputFormat();
+
+ InputSplit[] splits = aif.getSplits(job, 1);
+
+ Assert.assertEquals(1, splits.length);
+
+ InputSplit split = splits[0];
+
+ Assert.assertEquals(RangeInputSplit.class, split.getClass());
+
+ RangeInputSplit risplit = (RangeInputSplit) split;
+
+ Assert.assertEquals(getAdminPrincipal(), risplit.getPrincipal());
+ Assert.assertEquals(table, risplit.getTableName());
+ Assert.assertEquals(getAdminToken(), risplit.getToken());
+ Assert.assertEquals(auths, risplit.getAuths());
+ Assert.assertEquals(getConnector().getInstance().getInstanceName(), risplit.getInstanceName());
+ Assert.assertEquals(isolated, risplit.isIsolatedScan());
+ Assert.assertEquals(localIters, risplit.usesLocalIterators());
+ Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
+ Assert.assertEquals(level, risplit.getLogLevel());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java
new file mode 100644
index 0000000..35ba7bc
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloMultiTableInputFormatIT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloMultiTableInputFormat;
+import org.apache.accumulo.core.client.mapred.RangeInputSplit;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloMultiTableInputFormatIT extends AccumuloClusterHarness {
+
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper implements Mapper<Key,Value,Key,Value> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+ try {
+ String tableName = ((RangeInputSplit) reporter.getInputSplit()).getTableName();
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+ assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ public void configure(JobConf job) {}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ assertEquals(100, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table1> <table2>");
+ }
+
+ String user = getAdminPrincipal();
+ AuthenticationToken pass = getAdminToken();
+ String table1 = args[0];
+ String table2 = args[1];
+
+ JobConf job = new JobConf(getConf());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormat(AccumuloInputFormat.class);
+
+ AccumuloMultiTableInputFormat.setConnectorInfo(job, user, pass);
+ AccumuloMultiTableInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+ InputTableConfig tableConfig1 = new InputTableConfig();
+ InputTableConfig tableConfig2 = new InputTableConfig();
+
+ Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
+ configMap.put(table1, tableConfig1);
+ configMap.put(table2, tableConfig2);
+
+ AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+ assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+ }
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+ Connector c = getConnector();
+ c.tableOperations().create(table1);
+ c.tableOperations().create(table2);
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+ BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1)));
+ t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes()));
+ bw.addMutation(t1m);
+ Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1)));
+ t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes()));
+ bw2.addMutation(t2m);
+ }
+ bw.close();
+ bw2.close();
+
+ MRTester.main(new String[] {table1, table2});
+ assertNull(e1);
+ assertNull(e2);
+ }
+
+}