You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by md...@apache.org on 2014/04/21 23:20:08 UTC
[08/12] ACCUMULO-1880 create mapreduce module
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java
deleted file mode 100644
index 36054c8..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormatTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapred;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
-
-/**
- *
- */
-public class AccumuloOutputFormatTest {
- private static AssertionError e1 = null;
- private static final String PREFIX = AccumuloOutputFormatTest.class.getSimpleName();
- private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
- private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
- private static final String TEST_TABLE_2 = PREFIX + "_mapred_table_2";
-
- private static class MRTester extends Configured implements Tool {
- private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
- Key key = null;
- int count = 0;
- OutputCollector<Text,Mutation> finalOutput;
-
- @Override
- public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException {
- finalOutput = output;
- try {
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
- } catch (AssertionError e) {
- e1 = e;
- }
- key = new Key(k);
- count++;
- }
-
- @Override
- public void configure(JobConf job) {}
-
- @Override
- public void close() throws IOException {
- Mutation m = new Mutation("total");
- m.put("", "", Integer.toString(count));
- finalOutput.collect(new Text(), m);
- }
-
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 4) {
- throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <inputtable> <outputtable>");
- }
-
- String user = args[0];
- String pass = args[1];
- String table1 = args[2];
- String table2 = args[3];
-
- JobConf job = new JobConf(getConf());
- job.setJarByClass(this.getClass());
-
- job.setInputFormat(AccumuloInputFormat.class);
-
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloInputFormat.setInputTableName(job, table1);
- AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setMapperClass(TestMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormat(AccumuloOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Mutation.class);
-
- AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloOutputFormat.setCreateTables(job, false);
- AccumuloOutputFormat.setDefaultTableName(job, table2);
- AccumuloOutputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setNumReduceTasks(0);
-
- return JobClient.runJob(job).isSuccessful() ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
- }
- }
-
- @Test
- public void testBWSettings() throws IOException {
- JobConf job = new JobConf();
-
- // make sure we aren't testing defaults
- final BatchWriterConfig bwDefaults = new BatchWriterConfig();
- assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
- assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS));
- assertNotEquals(42, bwDefaults.getMaxWriteThreads());
- assertNotEquals(1123581321l, bwDefaults.getMaxMemory());
-
- final BatchWriterConfig bwConfig = new BatchWriterConfig();
- bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS);
- bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS);
- bwConfig.setMaxWriteThreads(42);
- bwConfig.setMaxMemory(1123581321l);
- AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
-
- AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
- @Override
- public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
- BatchWriterConfig bwOpts = getBatchWriterOptions(job);
-
- // passive check
- assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
- assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), bwOpts.getTimeout(TimeUnit.MILLISECONDS));
- assertEquals(bwConfig.getMaxWriteThreads(), bwOpts.getMaxWriteThreads());
- assertEquals(bwConfig.getMaxMemory(), bwOpts.getMaxMemory());
-
- // explicit check
- assertEquals(7654321l, bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
- assertEquals(9898989l, bwOpts.getTimeout(TimeUnit.MILLISECONDS));
- assertEquals(42, bwOpts.getMaxWriteThreads());
- assertEquals(1123581321l, bwOpts.getMaxMemory());
-
- }
- };
- myAOF.checkOutputSpecs(null, job);
- }
-
- @Test
- public void testMR() throws Exception {
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new PasswordToken(""));
- c.tableOperations().create(TEST_TABLE_1);
- c.tableOperations().create(TEST_TABLE_2);
- BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
- assertNull(e1);
-
- Scanner scanner = c.createScanner(TEST_TABLE_2, new Authorizations());
- Iterator<Entry<Key,Value>> iter = scanner.iterator();
- assertTrue(iter.hasNext());
- Entry<Key,Value> entry = iter.next();
- assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
- assertFalse(iter.hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java
deleted file mode 100644
index a0ae0b3..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormatTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapred;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyValue;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.PeekingIterator;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
-
-public class AccumuloRowInputFormatTest {
- private static final String PREFIX = AccumuloRowInputFormatTest.class.getSimpleName();
- private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
- private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
-
- private static final String ROW1 = "row1";
- private static final String ROW2 = "row2";
- private static final String ROW3 = "row3";
- private static final String COLF1 = "colf1";
- private static List<Entry<Key,Value>> row1;
- private static List<Entry<Key,Value>> row2;
- private static List<Entry<Key,Value>> row3;
- private static AssertionError e1 = null;
- private static AssertionError e2 = null;
-
- public AccumuloRowInputFormatTest() {
- row1 = new ArrayList<Entry<Key,Value>>();
- row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes()));
- row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes()));
- row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes()));
- row2 = new ArrayList<Entry<Key,Value>>();
- row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes()));
- row3 = new ArrayList<Entry<Key,Value>>();
- row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes()));
- }
-
- public static void checkLists(final List<Entry<Key,Value>> first, final List<Entry<Key,Value>> second) {
- assertEquals("Sizes should be the same.", first.size(), second.size());
- for (int i = 0; i < first.size(); i++) {
- assertEquals("Keys should be equal.", first.get(i).getKey(), second.get(i).getKey());
- assertEquals("Values should be equal.", first.get(i).getValue(), second.get(i).getValue());
- }
- }
-
- public static void checkLists(final List<Entry<Key,Value>> first, final Iterator<Entry<Key,Value>> second) {
- int entryIndex = 0;
- while (second.hasNext()) {
- final Entry<Key,Value> entry = second.next();
- assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey());
- assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue());
- entryIndex++;
- }
- }
-
- public static void insertList(final BatchWriter writer, final List<Entry<Key,Value>> list) throws MutationsRejectedException {
- for (Entry<Key,Value> e : list) {
- final Key key = e.getKey();
- final Mutation mutation = new Mutation(key.getRow());
- ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility());
- mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, key.getTimestamp(), e.getValue());
- writer.addMutation(mutation);
- }
- }
-
- private static class MRTester extends Configured implements Tool {
- public static class TestMapper implements Mapper<Text,PeekingIterator<Entry<Key,Value>>,Key,Value> {
- int count = 0;
-
- @Override
- public void map(Text k, PeekingIterator<Entry<Key,Value>> v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
- try {
- switch (count) {
- case 0:
- assertEquals("Current key should be " + ROW1, new Text(ROW1), k);
- checkLists(row1, v);
- break;
- case 1:
- assertEquals("Current key should be " + ROW2, new Text(ROW2), k);
- checkLists(row2, v);
- break;
- case 2:
- assertEquals("Current key should be " + ROW3, new Text(ROW3), k);
- checkLists(row3, v);
- break;
- default:
- assertTrue(false);
- }
- } catch (AssertionError e) {
- e1 = e;
- }
- count++;
- }
-
- @Override
- public void configure(JobConf job) {}
-
- @Override
- public void close() throws IOException {
- try {
- assertEquals(3, count);
- } catch (AssertionError e) {
- e2 = e;
- }
- }
-
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 3) {
- throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>");
- }
-
- String user = args[0];
- String pass = args[1];
- String table = args[2];
-
- JobConf job = new JobConf(getConf());
- job.setJarByClass(this.getClass());
-
- job.setInputFormat(AccumuloRowInputFormat.class);
-
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloRowInputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setMapperClass(TestMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormat(NullOutputFormat.class);
-
- job.setNumReduceTasks(0);
-
- return JobClient.runJob(job).isSuccessful() ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
- }
- }
-
- @Test
- public void test() throws Exception {
- final MockInstance instance = new MockInstance(INSTANCE_NAME);
- final Connector conn = instance.getConnector("root", new PasswordToken(""));
- conn.tableOperations().create(TEST_TABLE_1);
- BatchWriter writer = null;
- try {
- writer = conn.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
- insertList(writer, row1);
- insertList(writer, row2);
- insertList(writer, row3);
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
- MRTester.main(new String[] {"root", "", TEST_TABLE_1});
- assertNull(e1);
- assertNull(e2);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java
deleted file mode 100644
index 0e1fe39..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/TokenFileTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapred;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- *
- */
-public class TokenFileTest {
- private static AssertionError e1 = null;
- private static final String PREFIX = TokenFileTest.class.getSimpleName();
- private static final String INSTANCE_NAME = PREFIX + "_mapred_instance";
- private static final String TEST_TABLE_1 = PREFIX + "_mapred_table_1";
- private static final String TEST_TABLE_2 = PREFIX + "_mapred_table_2";
-
- private static class MRTokenFileTester extends Configured implements Tool {
- private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
- Key key = null;
- int count = 0;
- OutputCollector<Text,Mutation> finalOutput;
-
- @Override
- public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException {
- finalOutput = output;
- try {
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
- } catch (AssertionError e) {
- e1 = e;
- }
- key = new Key(k);
- count++;
- }
-
- @Override
- public void configure(JobConf job) {}
-
- @Override
- public void close() throws IOException {
- Mutation m = new Mutation("total");
- m.put("", "", Integer.toString(count));
- finalOutput.collect(new Text(), m);
- }
-
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 4) {
- throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName() + " <user> <token file> <inputtable> <outputtable>");
- }
-
- String user = args[0];
- String tokenFile = args[1];
- String table1 = args[2];
- String table2 = args[3];
-
- JobConf job = new JobConf(getConf());
- job.setJarByClass(this.getClass());
-
- job.setInputFormat(AccumuloInputFormat.class);
-
- AccumuloInputFormat.setConnectorInfo(job, user, tokenFile);
- AccumuloInputFormat.setInputTableName(job, table1);
- AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setMapperClass(TestMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormat(AccumuloOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Mutation.class);
-
- AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
- AccumuloOutputFormat.setCreateTables(job, false);
- AccumuloOutputFormat.setDefaultTableName(job, table2);
- AccumuloOutputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setNumReduceTasks(0);
-
- return JobClient.runJob(job).isSuccessful() ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = CachedConfiguration.getInstance();
- conf.set("hadoop.tmp.dir", new File(args[1]).getParent());
- assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
- }
- }
-
- @Rule
- public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
-
- @Test
- public void testMR() throws Exception {
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new PasswordToken(""));
- c.tableOperations().create(TEST_TABLE_1);
- c.tableOperations().create(TEST_TABLE_2);
- BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- File tf = folder.newFile("root_test.pw");
- PrintStream out = new PrintStream(tf);
- String outString = new Credentials("root", new PasswordToken("")).serialize();
- out.println(outString);
- out.close();
-
- MRTokenFileTester.main(new String[] {"root", tf.getAbsolutePath(), TEST_TABLE_1, TEST_TABLE_2});
- assertNull(e1);
-
- Scanner scanner = c.createScanner(TEST_TABLE_2, new Authorizations());
- Iterator<Entry<Key,Value>> iter = scanner.iterator();
- assertTrue(iter.hasNext());
- Entry<Key,Value> entry = iter.next();
- assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
- assertFalse(iter.hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
deleted file mode 100644
index 2a453e3..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class AccumuloFileOutputFormatTest {
- private static final String PREFIX = AccumuloFileOutputFormatTest.class.getSimpleName();
- private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
- private static final String BAD_TABLE = PREFIX + "_mapreduce_bad_table";
- private static final String TEST_TABLE = PREFIX + "_mapreduce_test_table";
- private static final String EMPTY_TABLE = PREFIX + "_mapreduce_empty_table";
-
- private static AssertionError e1 = null;
- private static AssertionError e2 = null;
-
- @Rule
- public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
-
- @BeforeClass
- public static void setup() throws Exception {
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new PasswordToken(""));
- c.tableOperations().create(EMPTY_TABLE);
- c.tableOperations().create(TEST_TABLE);
- c.tableOperations().create(BAD_TABLE);
- BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
- Mutation m = new Mutation("Key");
- m.put("", "", "");
- bw.addMutation(m);
- bw.close();
- bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
- m = new Mutation("r1");
- m.put("cf1", "cq1", "A&B");
- m.put("cf1", "cq1", "A&B");
- m.put("cf1", "cq2", "A&");
- bw.addMutation(m);
- bw.close();
- }
-
- @Test
- public void testEmptyWrite() throws Exception {
- handleWriteTests(false);
- }
-
- @Test
- public void testRealWrite() throws Exception {
- handleWriteTests(true);
- }
-
- private static class MRTester extends Configured implements Tool {
- private static class BadKeyMapper extends Mapper<Key,Value,Key,Value> {
- int index = 0;
-
- @Override
- protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
- try {
- try {
- context.write(key, value);
- if (index == 2)
- assertTrue(false);
- } catch (Exception e) {
- assertEquals(2, index);
- }
- } catch (AssertionError e) {
- e1 = e;
- }
- index++;
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- assertEquals(2, index);
- } catch (AssertionError e) {
- e2 = e;
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 4) {
- throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <outputfile>");
- }
-
- String user = args[0];
- String pass = args[1];
- String table = args[2];
-
- @SuppressWarnings("deprecation")
- Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
- job.setJarByClass(this.getClass());
-
- job.setInputFormatClass(AccumuloInputFormat.class);
-
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
- AccumuloFileOutputFormat.setOutputPath(job, new Path(args[3]));
-
- job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : Mapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormatClass(AccumuloFileOutputFormat.class);
-
- job.setNumReduceTasks(0);
-
- job.waitForCompletion(true);
-
- return job.isSuccessful() ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
- }
- }
-
- public void handleWriteTests(boolean content) throws Exception {
- File f = folder.newFile("handleWriteTests");
- f.delete();
- MRTester.main(new String[] {"root", "", content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
-
- assertTrue(f.exists());
- File[] files = f.listFiles(new FileFilter() {
- @Override
- public boolean accept(File file) {
- return file.getName().startsWith("part-m-");
- }
- });
- if (content) {
- assertEquals(1, files.length);
- assertTrue(files[0].exists());
- } else {
- assertEquals(0, files.length);
- }
- }
-
- @Test
- public void writeBadVisibility() throws Exception {
- File f = folder.newFile("writeBadVisibility");
- f.delete();
- MRTester.main(new String[] {"root", "", BAD_TABLE, f.getAbsolutePath()});
- assertNull(e1);
- assertNull(e2);
- }
-
- @Test
- public void validateConfiguration() throws IOException, InterruptedException {
-
- int a = 7;
- long b = 300l;
- long c = 50l;
- long d = 10l;
- String e = "snappy";
-
- @SuppressWarnings("deprecation")
- Job job1 = new Job();
- AccumuloFileOutputFormat.setReplication(job1, a);
- AccumuloFileOutputFormat.setFileBlockSize(job1, b);
- AccumuloFileOutputFormat.setDataBlockSize(job1, c);
- AccumuloFileOutputFormat.setIndexBlockSize(job1, d);
- AccumuloFileOutputFormat.setCompressionType(job1, e);
-
- AccumuloConfiguration acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job1);
-
- assertEquals(7, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
- assertEquals(300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
- assertEquals(50l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
- assertEquals(10l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
- assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
-
- a = 17;
- b = 1300l;
- c = 150l;
- d = 110l;
- e = "lzo";
-
- @SuppressWarnings("deprecation")
- Job job2 = new Job();
- AccumuloFileOutputFormat.setReplication(job2, a);
- AccumuloFileOutputFormat.setFileBlockSize(job2, b);
- AccumuloFileOutputFormat.setDataBlockSize(job2, c);
- AccumuloFileOutputFormat.setIndexBlockSize(job2, d);
- AccumuloFileOutputFormat.setCompressionType(job2, e);
-
- acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job2);
-
- assertEquals(17, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
- assertEquals(1300l, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
- assertEquals(150l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
- assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
- assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
deleted file mode 100644
index 2500972..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.RegExFilter;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class AccumuloInputFormatTest {
-
- private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName();
-
- /**
- * Check that the iterator configuration is getting stored in the Job conf correctly.
- */
- @Test
- public void testSetIterator() throws IOException {
- @SuppressWarnings("deprecation")
- Job job = new Job();
-
- IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator");
- AccumuloInputFormat.addIterator(job, is);
- Configuration conf = job.getConfiguration();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- is.write(new DataOutputStream(baos));
- String iterators = conf.get("AccumuloInputFormat.ScanOpts.Iterators");
- assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators);
- }
-
- @Test
- public void testAddIterator() throws IOException {
- @SuppressWarnings("deprecation")
- Job job = new Job();
-
- AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
- AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
- IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
- iter.addOption("v1", "1");
- iter.addOption("junk", "\0omg:!\\xyzzy");
- AccumuloInputFormat.addIterator(job, iter);
-
- List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
-
- // Check the list size
- assertTrue(list.size() == 3);
-
- // Walk the list and make sure our settings are correct
- IteratorSetting setting = list.get(0);
- assertEquals(1, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass());
- assertEquals("WholeRow", setting.getName());
- assertEquals(0, setting.getOptions().size());
-
- setting = list.get(1);
- assertEquals(2, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
- assertEquals("Versions", setting.getName());
- assertEquals(0, setting.getOptions().size());
-
- setting = list.get(2);
- assertEquals(3, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
- assertEquals("Count", setting.getName());
- assertEquals(2, setting.getOptions().size());
- assertEquals("1", setting.getOptions().get("v1"));
- assertEquals("\0omg:!\\xyzzy", setting.getOptions().get("junk"));
- }
-
- /**
- * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There
- * should be no exceptions thrown when trying to parse these types of option entries.
- *
- * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected.
- */
- @Test
- public void testIteratorOptionEncoding() throws Throwable {
- String key = "colon:delimited:key";
- String value = "comma,delimited,value";
- IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class");
- someSetting.addOption(key, value);
- @SuppressWarnings("deprecation")
- Job job = new Job();
- AccumuloInputFormat.addIterator(job, someSetting);
-
- List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
- assertEquals(1, list.size());
- assertEquals(1, list.get(0).getOptions().size());
- assertEquals(list.get(0).getOptions().get(key), value);
-
- someSetting.addOption(key + "2", value);
- someSetting.setPriority(2);
- someSetting.setName("it2");
- AccumuloInputFormat.addIterator(job, someSetting);
- list = AccumuloInputFormat.getIterators(job);
- assertEquals(2, list.size());
- assertEquals(1, list.get(0).getOptions().size());
- assertEquals(list.get(0).getOptions().get(key), value);
- assertEquals(2, list.get(1).getOptions().size());
- assertEquals(list.get(1).getOptions().get(key), value);
- assertEquals(list.get(1).getOptions().get(key + "2"), value);
- }
-
- /**
- * Test getting iterator settings for multiple iterators set
- */
- @Test
- public void testGetIteratorSettings() throws IOException {
- @SuppressWarnings("deprecation")
- Job job = new Job();
-
- AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
- AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
- AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
-
- List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
-
- // Check the list size
- assertTrue(list.size() == 3);
-
- // Walk the list and make sure our settings are correct
- IteratorSetting setting = list.get(0);
- assertEquals(1, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass());
- assertEquals("WholeRow", setting.getName());
-
- setting = list.get(1);
- assertEquals(2, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
- assertEquals("Versions", setting.getName());
-
- setting = list.get(2);
- assertEquals(3, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
- assertEquals("Count", setting.getName());
-
- }
-
- @Test
- public void testSetRegex() throws IOException {
- @SuppressWarnings("deprecation")
- Job job = new Job();
-
- String regex = ">\"*%<>\'\\";
-
- IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
- RegExFilter.setRegexs(is, regex, null, null, null, false);
- AccumuloInputFormat.addIterator(job, is);
-
- assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName()));
- }
-
- private static AssertionError e1 = null;
- private static AssertionError e2 = null;
-
- private static class MRTester extends Configured implements Tool {
- private static class TestMapper extends Mapper<Key,Value,Key,Value> {
- Key key = null;
- int count = 0;
-
- @Override
- protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
- try {
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
- } catch (AssertionError e) {
- e1 = e;
- }
- key = new Key(k);
- count++;
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- assertEquals(100, count);
- } catch (AssertionError e) {
- e2 = e;
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 5) {
- throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <instanceName> <inputFormatClass>");
- }
-
- String user = args[0];
- String pass = args[1];
- String table = args[2];
-
- String instanceName = args[3];
- String inputFormatClassName = args[4];
- @SuppressWarnings("unchecked")
- Class<? extends InputFormat<?,?>> inputFormatClass = (Class<? extends InputFormat<?,?>>) Class.forName(inputFormatClassName);
-
- @SuppressWarnings("deprecation")
- Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
- job.setJarByClass(this.getClass());
-
- job.setInputFormatClass(inputFormatClass);
-
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloInputFormat.setMockInstance(job, instanceName);
-
- job.setMapperClass(TestMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormatClass(NullOutputFormat.class);
-
- job.setNumReduceTasks(0);
-
- job.waitForCompletion(true);
-
- return job.isSuccessful() ? 0 : 1;
- }
-
- public static int main(String[] args) throws Exception {
- return ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args);
- }
- }
-
- @Test
- public void testMap() throws Exception {
- final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
- final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
-
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new PasswordToken(""));
- c.tableOperations().create(TEST_TABLE_1);
- BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- Assert.assertEquals(0, MRTester.main(new String[] {"root", "", TEST_TABLE_1, INSTANCE_NAME, AccumuloInputFormat.class.getCanonicalName()}));
- assertNull(e1);
- assertNull(e2);
- }
-
- @Test
- public void testCorrectRangeInputSplits() throws Exception {
- @SuppressWarnings("deprecation")
- Job job = new Job(new Configuration(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
-
- String username = "user", table = "table", instance = "instance";
- PasswordToken password = new PasswordToken("password");
- Authorizations auths = new Authorizations("foo");
- Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
- boolean isolated = true, localIters = true;
- Level level = Level.WARN;
-
- Instance inst = new MockInstance(instance);
- Connector connector = inst.getConnector(username, password);
- connector.tableOperations().create(table);
-
- AccumuloInputFormat.setConnectorInfo(job, username, password);
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloInputFormat.setScanAuthorizations(job, auths);
- AccumuloInputFormat.setMockInstance(job, instance);
- AccumuloInputFormat.setScanIsolation(job, isolated);
- AccumuloInputFormat.setLocalIterators(job, localIters);
- AccumuloInputFormat.fetchColumns(job, fetchColumns);
- AccumuloInputFormat.setLogLevel(job, level);
-
- AccumuloInputFormat aif = new AccumuloInputFormat();
-
- List<InputSplit> splits = aif.getSplits(job);
-
- Assert.assertEquals(1, splits.size());
-
- InputSplit split = splits.get(0);
-
- Assert.assertEquals(RangeInputSplit.class, split.getClass());
-
- RangeInputSplit risplit = (RangeInputSplit) split;
-
- Assert.assertEquals(username, risplit.getPrincipal());
- Assert.assertEquals(table, risplit.getTableName());
- Assert.assertEquals(password, risplit.getToken());
- Assert.assertEquals(auths, risplit.getAuths());
- Assert.assertEquals(instance, risplit.getInstanceName());
- Assert.assertEquals(isolated, risplit.isIsolatedScan());
- Assert.assertEquals(localIters, risplit.usesLocalIterators());
- Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
- Assert.assertEquals(level, risplit.getLogLevel());
- }
-
- @Test
- public void testPartialInputSplitDelegationToConfiguration() throws Exception {
- String user = "testPartialInputSplitUser";
- PasswordToken password = new PasswordToken("");
-
- MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration");
- Connector c = mockInstance.getConnector(user, password);
- c.tableOperations().create("testtable");
- BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- Assert.assertEquals(
- 0,
- MRTester.main(new String[] {user, "", "testtable", "testPartialInputSplitDelegationToConfiguration",
- EmptySplitsAccumuloInputFormat.class.getCanonicalName()}));
- assertNull(e1);
- assertNull(e2);
- }
-
- @Test
- public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception {
- String user = "testPartialFailedInputSplit";
- PasswordToken password = new PasswordToken("");
-
- MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration");
- Connector c = mockInstance.getConnector(user, password);
- c.tableOperations().create("testtable");
- BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- // We should fail before we even get into the Mapper because we can't make the RecordReader
- Assert.assertEquals(
- 1,
- MRTester.main(new String[] {user, "", "testtable", "testPartialFailedInputSplitDelegationToConfiguration",
- BadPasswordSplitsAccumuloInputFormat.class.getCanonicalName()}));
- assertNull(e1);
- assertNull(e2);
- }
-
- @Test
- public void testEmptyColumnFamily() throws IOException {
- @SuppressWarnings("deprecation")
- Job job = new Job();
- Set<Pair<Text,Text>> cols = new HashSet<Pair<Text,Text>>();
- cols.add(new Pair<Text,Text>(new Text(""), null));
- cols.add(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
- cols.add(new Pair<Text,Text>(new Text(""), new Text("bar")));
- cols.add(new Pair<Text,Text>(new Text(""), new Text("")));
- cols.add(new Pair<Text,Text>(new Text("foo"), new Text("")));
- AccumuloInputFormat.fetchColumns(job, cols);
- Set<Pair<Text,Text>> setCols = AccumuloInputFormat.getFetchedColumns(job);
- assertEquals(cols, setCols);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
deleted file mode 100644
index 05fbbb4..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
-
-public class AccumuloMultiTableInputFormatTest {
-
- private static final String PREFIX = AccumuloMultiTableInputFormatTest.class.getSimpleName();
- private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
- private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
- private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2";
-
- private static AssertionError e1 = null;
- private static AssertionError e2 = null;
-
- private static class MRTester extends Configured implements Tool {
-
- private static class TestMapper extends Mapper<Key,Value,Key,Value> {
- Key key = null;
- int count = 0;
-
- @Override
- protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
- try {
- String tableName = ((RangeInputSplit) context.getInputSplit()).getTableName();
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
- assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
- } catch (AssertionError e) {
- e1 = e;
- }
- key = new Key(k);
- count++;
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- assertEquals(100, count);
- } catch (AssertionError e) {
- e2 = e;
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 4) {
- throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table1> <table2>");
- }
-
- String user = args[0];
- String pass = args[1];
- String table1 = args[2];
- String table2 = args[3];
-
- @SuppressWarnings("deprecation")
- Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
- job.setJarByClass(this.getClass());
-
- job.setInputFormatClass(AccumuloMultiTableInputFormat.class);
-
- AccumuloMultiTableInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
-
- InputTableConfig tableConfig1 = new InputTableConfig();
- InputTableConfig tableConfig2 = new InputTableConfig();
-
- Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
- configMap.put(table1, tableConfig1);
- configMap.put(table2, tableConfig2);
-
- AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
- AccumuloMultiTableInputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setMapperClass(TestMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormatClass(NullOutputFormat.class);
-
- job.setNumReduceTasks(0);
-
- job.waitForCompletion(true);
-
- return job.isSuccessful() ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
- }
- }
-
- /**
- * Generate incrementing counts and attach table name to the key/value so that order and multi-table data can be verified.
- */
- @Test
- public void testMap() throws Exception {
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new PasswordToken(""));
- c.tableOperations().create(TEST_TABLE_1);
- c.tableOperations().create(TEST_TABLE_2);
- BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
- BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1)));
- t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes()));
- bw.addMutation(t1m);
- Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1)));
- t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes()));
- bw2.addMutation(t2m);
- }
- bw.close();
- bw2.close();
-
- MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
- assertNull(e1);
- assertNull(e2);
- }
-
- /**
- * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext.
- */
- @Test
- public void testInputTableConfigSerialization() throws IOException {
- @SuppressWarnings("deprecation")
- Job job = new Job();
-
- InputTableConfig tableConfig = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b")))
- .fetchColumns(Collections.singleton(new Pair<Text,Text>(new Text("CF1"), new Text("CQ1"))))
- .setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
-
- Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
- configMap.put(TEST_TABLE_1, tableConfig);
- configMap.put(TEST_TABLE_2, tableConfig);
-
- AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
-
- assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_1));
- assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_2));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
deleted file mode 100644
index a0cb4e3..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
-
-/**
- *
- */
-public class AccumuloOutputFormatTest {
- private static AssertionError e1 = null;
- private static final String PREFIX = AccumuloOutputFormatTest.class.getSimpleName();
- private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
- private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
- private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2";
-
- private static class MRTester extends Configured implements Tool {
- private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
- Key key = null;
- int count = 0;
-
- @Override
- protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
- try {
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
- } catch (AssertionError e) {
- e1 = e;
- }
- key = new Key(k);
- count++;
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- Mutation m = new Mutation("total");
- m.put("", "", Integer.toString(count));
- context.write(new Text(), m);
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 4) {
- throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <inputtable> <outputtable>");
- }
-
- String user = args[0];
- String pass = args[1];
- String table1 = args[2];
- String table2 = args[3];
-
- @SuppressWarnings("deprecation")
- Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
- job.setJarByClass(this.getClass());
-
- job.setInputFormatClass(AccumuloInputFormat.class);
-
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloInputFormat.setInputTableName(job, table1);
- AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setMapperClass(TestMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormatClass(AccumuloOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Mutation.class);
-
- AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloOutputFormat.setCreateTables(job, false);
- AccumuloOutputFormat.setDefaultTableName(job, table2);
- AccumuloOutputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setNumReduceTasks(0);
-
- job.waitForCompletion(true);
-
- return job.isSuccessful() ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
- }
- }
-
- @Test
- public void testBWSettings() throws IOException {
- @SuppressWarnings("deprecation")
- Job job = new Job();
-
- // make sure we aren't testing defaults
- final BatchWriterConfig bwDefaults = new BatchWriterConfig();
- assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
- assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS));
- assertNotEquals(42, bwDefaults.getMaxWriteThreads());
- assertNotEquals(1123581321l, bwDefaults.getMaxMemory());
-
- final BatchWriterConfig bwConfig = new BatchWriterConfig();
- bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS);
- bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS);
- bwConfig.setMaxWriteThreads(42);
- bwConfig.setMaxMemory(1123581321l);
- AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
-
- AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
- @Override
- public void checkOutputSpecs(JobContext job) throws IOException {
- BatchWriterConfig bwOpts = getBatchWriterOptions(job);
-
- // passive check
- assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
- assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), bwOpts.getTimeout(TimeUnit.MILLISECONDS));
- assertEquals(bwConfig.getMaxWriteThreads(), bwOpts.getMaxWriteThreads());
- assertEquals(bwConfig.getMaxMemory(), bwOpts.getMaxMemory());
-
- // explicit check
- assertEquals(7654321l, bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
- assertEquals(9898989l, bwOpts.getTimeout(TimeUnit.MILLISECONDS));
- assertEquals(42, bwOpts.getMaxWriteThreads());
- assertEquals(1123581321l, bwOpts.getMaxMemory());
-
- }
- };
- myAOF.checkOutputSpecs(job);
- }
-
- @Test
- public void testMR() throws Exception {
- MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
- Connector c = mockInstance.getConnector("root", new PasswordToken(""));
- c.tableOperations().create(TEST_TABLE_1);
- c.tableOperations().create(TEST_TABLE_2);
- BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
- assertNull(e1);
-
- Scanner scanner = c.createScanner(TEST_TABLE_2, new Authorizations());
- Iterator<Entry<Key,Value>> iter = scanner.iterator();
- assertTrue(iter.hasNext());
- Entry<Key,Value> entry = iter.next();
- assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
- assertFalse(iter.hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
deleted file mode 100644
index 2207437..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyValue;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.PeekingIterator;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
-
-public class AccumuloRowInputFormatTest {
- private static final String PREFIX = AccumuloRowInputFormatTest.class.getSimpleName();
- private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
- private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
-
- private static final String ROW1 = "row1";
- private static final String ROW2 = "row2";
- private static final String ROW3 = "row3";
- private static final String COLF1 = "colf1";
- private static List<Entry<Key,Value>> row1;
- private static List<Entry<Key,Value>> row2;
- private static List<Entry<Key,Value>> row3;
- private static AssertionError e1 = null;
- private static AssertionError e2 = null;
-
- public AccumuloRowInputFormatTest() {
- row1 = new ArrayList<Entry<Key,Value>>();
- row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes()));
- row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes()));
- row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes()));
- row2 = new ArrayList<Entry<Key,Value>>();
- row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes()));
- row3 = new ArrayList<Entry<Key,Value>>();
- row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes()));
- }
-
- public static void checkLists(final List<Entry<Key,Value>> first, final List<Entry<Key,Value>> second) {
- assertEquals("Sizes should be the same.", first.size(), second.size());
- for (int i = 0; i < first.size(); i++) {
- assertEquals("Keys should be equal.", first.get(i).getKey(), second.get(i).getKey());
- assertEquals("Values should be equal.", first.get(i).getValue(), second.get(i).getValue());
- }
- }
-
- public static void checkLists(final List<Entry<Key,Value>> first, final Iterator<Entry<Key,Value>> second) {
- int entryIndex = 0;
- while (second.hasNext()) {
- final Entry<Key,Value> entry = second.next();
- assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey());
- assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue());
- entryIndex++;
- }
- }
-
- public static void insertList(final BatchWriter writer, final List<Entry<Key,Value>> list) throws MutationsRejectedException {
- for (Entry<Key,Value> e : list) {
- final Key key = e.getKey();
- final Mutation mutation = new Mutation(key.getRow());
- ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility());
- mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, key.getTimestamp(), e.getValue());
- writer.addMutation(mutation);
- }
- }
-
- private static class MRTester extends Configured implements Tool {
- private static class TestMapper extends Mapper<Text,PeekingIterator<Entry<Key,Value>>,Key,Value> {
- int count = 0;
-
- @Override
- protected void map(Text k, PeekingIterator<Entry<Key,Value>> v, Context context) throws IOException, InterruptedException {
- try {
- switch (count) {
- case 0:
- assertEquals("Current key should be " + ROW1, new Text(ROW1), k);
- checkLists(row1, v);
- break;
- case 1:
- assertEquals("Current key should be " + ROW2, new Text(ROW2), k);
- checkLists(row2, v);
- break;
- case 2:
- assertEquals("Current key should be " + ROW3, new Text(ROW3), k);
- checkLists(row3, v);
- break;
- default:
- assertTrue(false);
- }
- } catch (AssertionError e) {
- e1 = e;
- }
- count++;
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- assertEquals(3, count);
- } catch (AssertionError e) {
- e2 = e;
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
- if (args.length != 3) {
- throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>");
- }
-
- String user = args[0];
- String pass = args[1];
- String table = args[2];
-
- @SuppressWarnings("deprecation")
- Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
- job.setJarByClass(this.getClass());
-
- job.setInputFormatClass(AccumuloRowInputFormat.class);
-
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloRowInputFormat.setMockInstance(job, INSTANCE_NAME);
-
- job.setMapperClass(TestMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
- job.setOutputFormatClass(NullOutputFormat.class);
-
- job.setNumReduceTasks(0);
-
- job.waitForCompletion(true);
-
- return job.isSuccessful() ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
- }
- }
-
- @Test
- public void test() throws Exception {
- final MockInstance instance = new MockInstance(INSTANCE_NAME);
- final Connector conn = instance.getConnector("root", new PasswordToken(""));
- conn.tableOperations().create(TEST_TABLE_1);
- BatchWriter writer = null;
- try {
- writer = conn.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
- insertList(writer, row1);
- insertList(writer, row2);
- insertList(writer, row3);
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
- MRTester.main(new String[] {"root", "", TEST_TABLE_1});
- assertNull(e1);
- assertNull(e2);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java
deleted file mode 100644
index fce7781..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/BadPasswordSplitsAccumuloInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-/**
- * AccumuloInputFormat which returns an "empty" RangeInputSplit
- */
-public class BadPasswordSplitsAccumuloInputFormat extends AccumuloInputFormat {
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException {
- List<InputSplit> splits = super.getSplits(context);
-
- for (InputSplit split : splits) {
- org.apache.accumulo.core.client.mapreduce.RangeInputSplit rangeSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
- rangeSplit.setToken(new PasswordToken("anythingelse"));
- }
-
- return splits;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java
deleted file mode 100644
index dd531c0..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-
-/**
- * AccumuloInputFormat which returns an "empty" RangeInputSplit
- */
-public class EmptySplitsAccumuloInputFormat extends AccumuloInputFormat {
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException {
- List<InputSplit> oldSplits = super.getSplits(context);
- List<InputSplit> newSplits = new ArrayList<InputSplit>(oldSplits.size());
-
- // Copy only the necessary information
- for (InputSplit oldSplit : oldSplits) {
- org.apache.accumulo.core.client.mapreduce.RangeInputSplit newSplit = new org.apache.accumulo.core.client.mapreduce.RangeInputSplit(
- (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) oldSplit);
- newSplits.add(newSplit);
- }
-
- return newSplits;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4dfcb9de/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputTableConfigTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputTableConfigTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputTableConfigTest.java
deleted file mode 100644
index 7f5c7d8..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputTableConfigTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.junit.Before;
-import org.junit.Test;
-
-public class InputTableConfigTest {
-
- private InputTableConfig tableQueryConfig;
-
- @Before
- public void setUp() {
- tableQueryConfig = new InputTableConfig();
- }
-
- @Test
- public void testSerialization_OnlyTable() throws IOException {
- byte[] serialized = serialize(tableQueryConfig);
- InputTableConfig actualConfig = deserialize(serialized);
-
- assertEquals(tableQueryConfig, actualConfig);
- }
-
- @Test
- public void testSerialization_ranges() throws IOException {
- List<Range> ranges = new ArrayList<Range>();
- ranges.add(new Range("a", "b"));
- ranges.add(new Range("c", "d"));
- tableQueryConfig.setRanges(ranges);
-
- byte[] serialized = serialize(tableQueryConfig);
- InputTableConfig actualConfig = deserialize(serialized);
-
- assertEquals(ranges, actualConfig.getRanges());
- }
-
- @Test
- public void testSerialization_columns() throws IOException {
- Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
- columns.add(new Pair<Text,Text>(new Text("cf1"), new Text("cq1")));
- columns.add(new Pair<Text,Text>(new Text("cf2"), null));
- tableQueryConfig.fetchColumns(columns);
-
- byte[] serialized = serialize(tableQueryConfig);
- InputTableConfig actualConfig = deserialize(serialized);
-
- assertEquals(actualConfig.getFetchedColumns(), columns);
- }
-
- @Test
- public void testSerialization_iterators() throws IOException {
- List<IteratorSetting> settings = new ArrayList<IteratorSetting>();
- settings.add(new IteratorSetting(50, "iter", "iterclass"));
- settings.add(new IteratorSetting(55, "iter2", "iterclass2"));
- tableQueryConfig.setIterators(settings);
- byte[] serialized = serialize(tableQueryConfig);
- InputTableConfig actualConfig = deserialize(serialized);
- assertEquals(actualConfig.getIterators(), settings);
-
- }
-
- private byte[] serialize(InputTableConfig tableQueryConfig) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- tableQueryConfig.write(new DataOutputStream(baos));
- baos.close();
- return baos.toByteArray();
- }
-
- private InputTableConfig deserialize(byte[] bytes) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- InputTableConfig actualConfig = new InputTableConfig(new DataInputStream(bais));
- bais.close();
- return actualConfig;
- }
-}