You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/24 01:33:42 UTC
[11/31] git commit: ACCUMULO-1854 Get the mapreduce unit tests
working again
ACCUMULO-1854 Get the mapreduce unit tests working again
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e08736d7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e08736d7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e08736d7
Branch: refs/heads/master
Commit: e08736d7900b22d83a0e28c93696f6ef20086942
Parents: f4e4c39
Author: Josh Elser <el...@apache.org>
Authored: Thu Nov 21 21:26:09 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Nov 21 21:26:09 2013 -0500
----------------------------------------------------------------------
.../core/client/mapreduce/InputFormatBase.java | 38 +-
.../core/client/mapreduce/RangeInputSplit.java | 69 ---
.../mapreduce/AccumuloInputFormatTest.java | 159 ++++++
.../mapreduce/AccumuloInputFormatTest1.java | 534 -------------------
.../client/mapreduce/RangeInputSplitTest.java | 19 +-
5 files changed, 201 insertions(+), 618 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index e17b46d..4e5b5a8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -814,6 +814,24 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
String tableName = getInputTableName(context);
boolean autoAdjust = getAutoAdjustRanges(context);
List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context);
+ Instance instance = getInstance(context);
+ boolean offline = isOfflineScan(context);
+ boolean isolated = isIsolated(context);
+ boolean localIterators = usesLocalIterators(context);
+ boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass()));
+ Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(context);
+ Authorizations auths = getScanAuthorizations(context);
+ String principal = getPrincipal(context);
+
+ AuthenticationToken token;
+ try {
+ token = CredentialHelper.extractToken(getTokenClass(context), getToken(context));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
+ List<IteratorSetting> iterators = getIterators(context);
+ Level logLevel = getLogLevel(context);
if (ranges.isEmpty()) {
ranges = new ArrayList<Range>(1);
@@ -832,7 +850,6 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
binnedRanges = binOfflineTable(context, tableName, ranges);
}
} else {
- Instance instance = getInstance(context);
String tableId = null;
tl = getTabletLocator(context);
// its possible that the cache could contain complete, but old information about a tables tablets... so clear it
@@ -896,6 +913,25 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> {
if (!autoAdjust)
for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet())
splits.add(new RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0])));
+
+ for (InputSplit inputSplit : splits) {
+ RangeInputSplit split = (RangeInputSplit) inputSplit;
+
+ split.setTable(tableName);
+ split.setOffline(offline);
+ split.setIsolatedScan(isolated);
+ split.setUsesLocalIterators(localIterators);
+ split.setMockInstance(mockInstance);
+ split.setFetchedColumns(fetchedColumns);
+ split.setPrincipal(principal);
+ split.setToken(token);
+ split.setInstanceName(instance.getInstanceName());
+ split.setZooKeepers(instance.getZooKeepers());
+ split.setAuths(auths);
+ split.setIterators(iterators);
+ split.setLogLevel(logLevel);
+ }
+
return splits;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index 67b839b..6decdc6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@ -53,7 +53,6 @@ public class RangeInputSplit extends InputSplit implements Writable {
private Range range;
private String[] locations;
private String table, instanceName, zooKeepers, principal;
- private String rowRegex, colfamRegex, colqualRegex, valueRegex;
private AuthenticationToken token;
private Boolean offline, mockInstance, isolatedScan, localIterators;
private Authorizations auths;
@@ -164,22 +163,6 @@ public class RangeInputSplit extends InputSplit implements Writable {
}
if (in.readBoolean()) {
- rowRegex = in.readUTF();
- }
-
- if (in.readBoolean()) {
- colfamRegex = in.readUTF();
- }
-
- if (in.readBoolean()) {
- colqualRegex = in.readUTF();
- }
-
- if (in.readBoolean()) {
- valueRegex = in.readUTF();
- }
-
- if (in.readBoolean()) {
int numColumns = in.readInt();
List<String> columns = new ArrayList<String>(numColumns);
for (int i = 0; i < numColumns; i++) {
@@ -248,26 +231,6 @@ public class RangeInputSplit extends InputSplit implements Writable {
out.writeBoolean(mockInstance);
}
- out.writeBoolean(null != rowRegex);
- if (null != rowRegex) {
- out.writeUTF(rowRegex);
- }
-
- out.writeBoolean(null != colfamRegex);
- if (null != colfamRegex) {
- out.writeUTF(colfamRegex);
- }
-
- out.writeBoolean(null != colqualRegex);
- if (null != colqualRegex) {
- out.writeUTF(colqualRegex);
- }
-
- out.writeBoolean(null != valueRegex);
- if (null != valueRegex) {
- out.writeUTF(valueRegex);
- }
-
out.writeBoolean(null != fetchedColumns);
if (null != fetchedColumns) {
String[] cols = InputConfigurator.serializeColumns(fetchedColumns);
@@ -391,38 +354,6 @@ public class RangeInputSplit extends InputSplit implements Writable {
this.locations = locations;
}
- public String getRowRegex() {
- return rowRegex;
- }
-
- public void setRowRegex(String rowRegex) {
- this.rowRegex = rowRegex;
- }
-
- public String getColfamRegex() {
- return colfamRegex;
- }
-
- public void setColfamRegex(String colfamRegex) {
- this.colfamRegex = colfamRegex;
- }
-
- public String getColqualRegex() {
- return colqualRegex;
- }
-
- public void setColqualRegex(String colqualRegex) {
- this.colqualRegex = colqualRegex;
- }
-
- public String getValueRegex() {
- return valueRegex;
- }
-
- public void setValueRegex(String valueRegex) {
- this.valueRegex = valueRegex;
- }
-
public Boolean isMockInstance() {
return mockInstance;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 0ee03a2..f9ccdf1 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -23,11 +23,14 @@ import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
@@ -36,16 +39,24 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.Assert;
import org.junit.Test;
public class AccumuloInputFormatTest {
@@ -318,4 +329,152 @@ public class AccumuloInputFormatTest {
assertNull(e1);
assertNull(e2);
}
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testCorrectRangeInputSplits() throws Exception {
+ Job job = new Job(new Configuration(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+
+ String username = "user", table = "table", instance = "instance";
+ PasswordToken password = new PasswordToken("password");
+ Authorizations auths = new Authorizations("foo");
+ Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
+ boolean isolated = true, localIters = true;
+ Level level = Level.WARN;
+
+ Instance inst = new MockInstance(instance);
+ Connector connector = inst.getConnector(username, password);
+ connector.tableOperations().create(table);
+
+ AccumuloInputFormat.setConnectorInfo(job, username, password);
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setScanAuthorizations(job, auths);
+ AccumuloInputFormat.setMockInstance(job, instance);
+ AccumuloInputFormat.setScanIsolation(job, isolated);
+ AccumuloInputFormat.setLocalIterators(job, localIters);
+ AccumuloInputFormat.fetchColumns(job, fetchColumns);
+ AccumuloInputFormat.setLogLevel(job, level);
+
+ AccumuloInputFormat aif = new AccumuloInputFormat();
+
+ List<InputSplit> splits = aif.getSplits(job);
+
+ Assert.assertEquals(1, splits.size());
+
+ InputSplit split = splits.get(0);
+
+ Assert.assertEquals(RangeInputSplit.class, split.getClass());
+
+ RangeInputSplit risplit = (RangeInputSplit) split;
+
+ Assert.assertEquals(username, risplit.getPrincipal());
+ Assert.assertEquals(table, risplit.getTable());
+ Assert.assertEquals(password, risplit.getToken());
+ Assert.assertEquals(auths, risplit.getAuths());
+ Assert.assertEquals(instance, risplit.getInstanceName());
+ Assert.assertEquals(isolated, risplit.isIsolatedScan());
+ Assert.assertEquals(localIters, risplit.usesLocalIterators());
+ Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
+ Assert.assertEquals(level, risplit.getLogLevel());
+ }
+
+ static class TestMapper extends Mapper<Key,Value,Key,Value> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ key = new Key(k);
+ count++;
+ }
+ }
+
+ @Test
+ public void testPartialInputSplitDelegationToConfiguration() throws Exception {
+ String user = "testPartialInputSplitUser";
+ PasswordToken password = new PasswordToken("");
+
+ MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration");
+ Connector c = mockInstance.getConnector(user, password);
+ c.tableOperations().create("testtable");
+ BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ Job job = new Job(new Configuration());
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ job.setMapperClass(TestMapper.class);
+ job.setNumReduceTasks(0);
+ AccumuloInputFormat.setConnectorInfo(job, user, password);
+ AccumuloInputFormat.setInputTableName(job, "testtable");
+ AccumuloInputFormat.setScanAuthorizations(job, new Authorizations());
+ AccumuloInputFormat.setMockInstance(job, "testPartialInputSplitDelegationToConfiguration");
+
+ AccumuloInputFormat input = new AccumuloInputFormat();
+ List<InputSplit> splits = input.getSplits(job);
+ assertEquals(splits.size(), 1);
+
+ TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
+
+ RangeInputSplit emptySplit = new RangeInputSplit();
+
+ // Using an empty split should fall back to the information in the Job's Configuration
+ TaskAttemptID id = new TaskAttemptID();
+ TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
+ RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
+ Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
+ reader.initialize(emptySplit, context);
+ mapper.run(context);
+ }
+
+ @Test(expected = IOException.class)
+ public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception {
+ String user = "testPartialFailedInputSplit";
+ PasswordToken password = new PasswordToken("");
+
+ MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration");
+ Connector c = mockInstance.getConnector(user, password);
+ c.tableOperations().create("testtable");
+ BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ Job job = new Job(new Configuration());
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ job.setMapperClass(TestMapper.class);
+ job.setNumReduceTasks(0);
+ AccumuloInputFormat.setConnectorInfo(job, user, password);
+ AccumuloInputFormat.setInputTableName(job, "testtable");
+ AccumuloInputFormat.setMockInstance(job, "testPartialFailedInputSplitDelegationToConfiguration");
+
+ AccumuloInputFormat input = new AccumuloInputFormat();
+ List<InputSplit> splits = input.getSplits(job);
+ assertEquals(splits.size(), 1);
+
+ TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
+
+ RangeInputSplit emptySplit = new RangeInputSplit();
+ emptySplit.setPrincipal("root");
+ emptySplit.setToken(new PasswordToken("anythingelse"));
+
+ // Using an empty split should fall back to the information in the Job's Configuration
+ TaskAttemptID id = new TaskAttemptID();
+ TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
+ RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
+ Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
+ reader.initialize(emptySplit, context);
+ mapper.run(context);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java
deleted file mode 100644
index 7239b01..0000000
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest1.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RegexType;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.log4j.Level;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class AccumuloInputFormatTest {
-
- @After
- public void tearDown() throws Exception {}
-
- /**
- * Test basic setting & getting of max versions.
- *
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @Test
- public void testMaxVersions() throws IOException {
- JobContext job = new JobContext(new Configuration(), new JobID());
- AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1);
- int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration());
- assertEquals(1, version);
- }
-
- /**
- * Test max versions with an invalid value.
- *
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @Test(expected = IOException.class)
- public void testMaxVersionsLessThan1() throws IOException {
- JobContext job = new JobContext(new Configuration(), new JobID());
- AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0);
- }
-
- /**
- * Test no max version configured.
- */
- @Test
- public void testNoMaxVersion() {
- JobContext job = new JobContext(new Configuration(), new JobID());
- assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration()));
- }
-
- /**
- * Check that the iterator configuration is getting stored in the Job conf correctly.
- */
- @SuppressWarnings("deprecation")
- @Test
- public void testSetIterator() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
- AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
- Configuration conf = job.getConfiguration();
- String iterators = conf.get("AccumuloInputFormat.iterators");
- assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
- }
-
- @Test
- public void testAddIterator() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
- AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
- AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
- IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
- iter.addOption("v1", "1");
- iter.addOption("junk", "\0omg:!\\xyzzy");
- AccumuloInputFormat.addIterator(job.getConfiguration(), iter);
-
- List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job.getConfiguration());
-
- // Check the list size
- assertTrue(list.size() == 3);
-
- // Walk the list and make sure our settings are correct
- AccumuloIterator setting = list.get(0);
- assertEquals(1, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass());
- assertEquals("WholeRow", setting.getIteratorName());
-
- setting = list.get(1);
- assertEquals(2, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
- assertEquals("Versions", setting.getIteratorName());
-
- setting = list.get(2);
- assertEquals(3, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
- assertEquals("Count", setting.getIteratorName());
-
- List<AccumuloIteratorOption> iteratorOptions = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
- assertEquals(2, iteratorOptions.size());
- assertEquals("Count", iteratorOptions.get(0).getIteratorName());
- assertEquals("Count", iteratorOptions.get(1).getIteratorName());
- assertEquals("v1", iteratorOptions.get(0).getKey());
- assertEquals("1", iteratorOptions.get(0).getValue());
- assertEquals("junk", iteratorOptions.get(1).getKey());
- assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue());
- }
-
- /**
- * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There
- * should be no exceptions thrown when trying to parse these types of option entries.
- *
- * This test makes sure that the expected raw values, as appears in the Job, are equal to what's expected.
- */
- @Test
- public void testIteratorOptionEncoding() throws Throwable {
- String key = "colon:delimited:key";
- String value = "comma,delimited,value";
- IteratorSetting someSetting = new IteratorSetting(1, "iterator", "Iterator.class");
- someSetting.addOption(key, value);
- Job job = new Job();
- AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting);
-
- final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString();
-
- assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options"));
-
- List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
- assertEquals(1, opts.size());
- assertEquals(opts.get(0).getKey(), key);
- assertEquals(opts.get(0).getValue(), value);
-
- someSetting.addOption(key + "2", value);
- someSetting.setPriority(2);
- someSetting.setName("it2");
- AccumuloInputFormat.addIterator(job.getConfiguration(), someSetting);
- opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
- assertEquals(3, opts.size());
- for (AccumuloIteratorOption opt : opts) {
- assertEquals(opt.getKey().substring(0, key.length()), key);
- assertEquals(opt.getValue(), value);
- }
- }
-
- /**
- * Test getting iterator settings for multiple iterators set
- */
- @SuppressWarnings("deprecation")
- @Test
- public void testGetIteratorSettings() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
- AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
- AccumuloInputFormat.setIterator(job, 2, "org.apache.accumulo.core.iterators.VersioningIterator", "Versions");
- AccumuloInputFormat.setIterator(job, 3, "org.apache.accumulo.core.iterators.CountingIterator", "Count");
-
- List<AccumuloIterator> list = AccumuloInputFormat.getIterators(job);
-
- // Check the list size
- assertTrue(list.size() == 3);
-
- // Walk the list and make sure our settings are correct
- AccumuloIterator setting = list.get(0);
- assertEquals(1, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass());
- assertEquals("WholeRow", setting.getIteratorName());
-
- setting = list.get(1);
- assertEquals(2, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
- assertEquals("Versions", setting.getIteratorName());
-
- setting = list.get(2);
- assertEquals(3, setting.getPriority());
- assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
- assertEquals("Count", setting.getIteratorName());
-
- }
-
- /**
- * Check that the iterator options are getting stored in the Job conf correctly.
- */
- @SuppressWarnings("deprecation")
- @Test
- public void testSetIteratorOption() {
- JobContext job = new JobContext(new Configuration(), new JobID());
- AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue");
-
- Configuration conf = job.getConfiguration();
- String options = conf.get("AccumuloInputFormat.iterators.options");
- assertEquals(new String("someIterator:aKey:aValue"), options);
- }
-
- /**
- * Test getting iterator options for multiple options set
- */
- @SuppressWarnings("deprecation")
- @Test
- public void testGetIteratorOption() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
- AccumuloInputFormat.setIteratorOption(job, "iterator1", "key1", "value1");
- AccumuloInputFormat.setIteratorOption(job, "iterator2", "key2", "value2");
- AccumuloInputFormat.setIteratorOption(job, "iterator3", "key3", "value3");
-
- List<AccumuloIteratorOption> list = AccumuloInputFormat.getIteratorOptions(job);
-
- // Check the list size
- assertEquals(3, list.size());
-
- // Walk the list and make sure all the options are correct
- AccumuloIteratorOption option = list.get(0);
- assertEquals("iterator1", option.getIteratorName());
- assertEquals("key1", option.getKey());
- assertEquals("value1", option.getValue());
-
- option = list.get(1);
- assertEquals("iterator2", option.getIteratorName());
- assertEquals("key2", option.getKey());
- assertEquals("value2", option.getValue());
-
- option = list.get(2);
- assertEquals("iterator3", option.getIteratorName());
- assertEquals("key3", option.getKey());
- assertEquals("value3", option.getValue());
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void testSetRegex() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
- String regex = ">\"*%<>\'\\";
-
- AccumuloInputFormat.setRegex(job, RegexType.ROW, regex);
-
- assertTrue(regex.equals(AccumuloInputFormat.getRegex(job, RegexType.ROW)));
- }
-
- static class TestMapper extends Mapper<Key,Value,Key,Value> {
- Key key = null;
- int count = 0;
-
- @Override
- protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
- key = new Key(k);
- count++;
- }
- }
-
- @Test
- public void testMap() throws Exception {
- MockInstance mockInstance = new MockInstance("testmapinstance");
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create("testtable");
- BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- Job job = new Job(new Configuration());
- job.setInputFormatClass(AccumuloInputFormat.class);
- job.setMapperClass(TestMapper.class);
- job.setNumReduceTasks(0);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
-
- AccumuloInputFormat input = new AccumuloInputFormat();
- List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 1);
-
- TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
- for (InputSplit split : splits) {
- RangeInputSplit risplit = (RangeInputSplit) split;
- Assert.assertEquals("root", risplit.getUsername());
- Assert.assertArrayEquals(new byte[0], risplit.getPassword());
- Assert.assertEquals("testtable", risplit.getTable());
- Assert.assertEquals(new Authorizations(), risplit.getAuths());
- Assert.assertEquals("testmapinstance", risplit.getInstanceName());
-
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
- RecordReader<Key,Value> reader = input.createRecordReader(split, attempt);
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
- reader.initialize(split, context);
- mapper.run(context);
- }
- }
-
- @Test
- public void testSimple() throws Exception {
- MockInstance mockInstance = new MockInstance("testmapinstance");
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create("testtable2");
- BatchWriter bw = c.createBatchWriter("testtable2", 10000L, 1000L, 4);
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- JobContext job = new JobContext(new Configuration(), new JobID());
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
- AccumuloInputFormat input = new AccumuloInputFormat();
- RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
- RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
- rr.initialize(ris, tac);
-
- TestMapper mapper = new TestMapper();
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), tac.getTaskAttemptID(), rr, null, null, null, ris);
- while (rr.nextKeyValue()) {
- mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
- }
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void testRegex() throws Exception {
- MockInstance mockInstance = new MockInstance("testmapinstance");
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create("testtable3");
- BatchWriter bw = c.createBatchWriter("testtable3", 10000L, 1000L, 4);
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- JobContext job = new JobContext(new Configuration(), new JobID());
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable3", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
- final String regex = ".*1.*";
- AccumuloInputFormat.setRegex(job, RegexType.ROW, regex);
- AccumuloInputFormat input = new AccumuloInputFormat();
- RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
- RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
- rr.initialize(ris, tac);
-
- Pattern p = Pattern.compile(regex);
- while (rr.nextKeyValue()) {
- Assert.assertTrue(p.matcher(rr.getCurrentKey().getRow().toString()).matches());
- }
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void testCorrectRangeInputSplits() throws Exception {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
- String username = "user", table = "table", rowRegex = "row.*", colfRegex = "colf.*", colqRegex = "colq.*";
- String valRegex = "val.*", instance = "instance";
- byte[] password = "password".getBytes();
- Authorizations auths = new Authorizations("foo");
- Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
- boolean isolated = true, localIters = true;
- int maxVersions = 5;
- Level level = Level.WARN;
-
- Instance inst = new MockInstance(instance);
- Connector connector = inst.getConnector(username, password);
- connector.tableOperations().create(table);
-
- AccumuloInputFormat.setInputInfo(job, username, password, table, auths);
- AccumuloInputFormat.setMockInstance(job, instance);
- AccumuloInputFormat.setRegex(job, RegexType.ROW, rowRegex);
- AccumuloInputFormat.setRegex(job, RegexType.COLUMN_FAMILY, colfRegex);
- AccumuloInputFormat.setRegex(job, RegexType.COLUMN_QUALIFIER, colqRegex);
- AccumuloInputFormat.setRegex(job, RegexType.VALUE, valRegex);
- AccumuloInputFormat.setIsolated(job, isolated);
- AccumuloInputFormat.setLocalIterators(job, localIters);
- AccumuloInputFormat.setMaxVersions(job, maxVersions);
- AccumuloInputFormat.fetchColumns(job, fetchColumns);
- AccumuloInputFormat.setLogLevel(job, level);
-
- AccumuloInputFormat aif = new AccumuloInputFormat();
-
- List<InputSplit> splits = aif.getSplits(job);
-
- Assert.assertEquals(1, splits.size());
-
- InputSplit split = splits.get(0);
-
- Assert.assertEquals(RangeInputSplit.class, split.getClass());
-
- RangeInputSplit risplit = (RangeInputSplit) split;
-
- Assert.assertEquals(username, risplit.getUsername());
- Assert.assertEquals(table, risplit.getTable());
- Assert.assertArrayEquals(password, risplit.getPassword());
- Assert.assertEquals(auths, risplit.getAuths());
- Assert.assertEquals(instance, risplit.getInstanceName());
- Assert.assertEquals(rowRegex, risplit.getRowRegex());
- Assert.assertEquals(colfRegex, risplit.getColfamRegex());
- Assert.assertEquals(colqRegex, risplit.getColqualRegex());
- Assert.assertEquals(valRegex, risplit.getValueRegex());
- Assert.assertEquals(isolated, risplit.isIsolatedScan());
- Assert.assertEquals(localIters, risplit.usesLocalIterators());
- Assert.assertEquals(maxVersions, risplit.getMaxVersions().intValue());
- Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
- Assert.assertEquals(level, risplit.getLogLevel());
- }
-
- @Test
- public void testPartialInputSplitDelegationToConfiguration() throws Exception {
- MockInstance mockInstance = new MockInstance("testPartialInputSplitDelegationToConfiguration");
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create("testtable");
- BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- Job job = new Job(new Configuration());
- job.setInputFormatClass(AccumuloInputFormat.class);
- job.setMapperClass(TestMapper.class);
- job.setNumReduceTasks(0);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialInputSplitDelegationToConfiguration");
-
- AccumuloInputFormat input = new AccumuloInputFormat();
- List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 1);
-
- TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
-
- RangeInputSplit emptySplit = new RangeInputSplit();
-
- // Using an empty split should fall back to the information in the Job's Configuration
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
- RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
- reader.initialize(emptySplit, context);
- mapper.run(context);
- }
-
- @Test(expected = IOException.class)
- public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception {
- MockInstance mockInstance = new MockInstance("testPartialFailedInputSplitDelegationToConfiguration");
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create("testtable");
- BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- Job job = new Job(new Configuration());
- job.setInputFormatClass(AccumuloInputFormat.class);
- job.setMapperClass(TestMapper.class);
- job.setNumReduceTasks(0);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testPartialFailedInputSplitDelegationToConfiguration");
-
- AccumuloInputFormat input = new AccumuloInputFormat();
- List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 1);
-
- TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
-
- RangeInputSplit emptySplit = new RangeInputSplit();
- emptySplit.setUsername("root");
- emptySplit.setPassword("anythingelse".getBytes());
-
- // Using an empty split should fall back to the information in the Job's Configuration
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
- RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
- reader.initialize(emptySplit, context);
- mapper.run(context);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e08736d7/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
index 22fb6e1..f6c604f 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
@@ -9,6 +9,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
@@ -53,14 +54,9 @@ public class RangeInputSplitTest {
split.setOffline(true);
split.setIsolatedScan(true);
split.setUsesLocalIterators(true);
- split.setMaxVersions(5);
- split.setRowRegex("row");
- split.setColfamRegex("colf");
- split.setColqualRegex("colq");
- split.setValueRegex("value");
split.setFetchedColumns(fetchedColumns);
- split.setPassword("password".getBytes());
- split.setUsername("root");
+ split.setToken(new PasswordToken("password"));
+ split.setPrincipal("root");
split.setInstanceName("instance");
split.setMockInstance(true);
split.setZooKeepers("localhost");
@@ -83,14 +79,9 @@ public class RangeInputSplitTest {
Assert.assertEquals(split.isOffline(), newSplit.isOffline());
Assert.assertEquals(split.isIsolatedScan(), newSplit.isOffline());
Assert.assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators());
- Assert.assertEquals(split.getMaxVersions(), newSplit.getMaxVersions());
- Assert.assertEquals(split.getRowRegex(), newSplit.getRowRegex());
- Assert.assertEquals(split.getColfamRegex(), newSplit.getColfamRegex());
- Assert.assertEquals(split.getColqualRegex(), newSplit.getColqualRegex());
- Assert.assertEquals(split.getValueRegex(), newSplit.getValueRegex());
Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns());
- Assert.assertEquals(new String(split.getPassword()), new String(newSplit.getPassword()));
- Assert.assertEquals(split.getUsername(), newSplit.getUsername());
+ Assert.assertEquals(split.getToken(), newSplit.getToken());
+ Assert.assertEquals(split.getPrincipal(), newSplit.getPrincipal());
Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName());
Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance());
Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers());