You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/11/26 16:47:29 UTC
[25/38] git commit: ACCUMULO-286 introduced ContextFactory to assist
with testing InputFormats and OutputFormats under different versions of
hadoop
ACCUMULO-286 introduced ContextFactory to assist with testing InputFormats and OutputFormats under different versions of hadoop
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/trunk@1229596 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit c0a0afdcceedba2947949761ca13ecc42ff8f9c1)
Reason: Testing
Author: Billie Rinaldi <bi...@apache.org>
Ref: ACCUMULO-1792
Expands change to CoordinateRecoveryTask to remove previous addition of InterruptedException catching in CoordinateRecoveryTask to maintain compile compatibility with non-2.0.2-alpha versions.
Differs from upstream by expanding test modifications to include depecrated ones removed in 1.5.x and tests only found in the 1.4.x branch.
Author: Sean Busbey <bu...@cloudera.com>
Signed-off-by: Eric Newton <er...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a3264e4f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a3264e4f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a3264e4f
Branch: refs/heads/1.5.1-SNAPSHOT
Commit: a3264e4f1b188e0f4808120fa39d5c1695b6b01c
Parents: 5d22af4
Author: Billie Rinaldi <bi...@apache.org>
Authored: Tue Jan 10 15:27:57 2012 +0000
Committer: Eric Newton <er...@gmail.com>
Committed: Mon Nov 25 16:06:42 2013 -0500
----------------------------------------------------------------------
.../accumulo/core/util/ContextFactory.java | 169 +++++++++++++++++++
.../mapreduce/AccumuloFileOutputFormatTest.java | 8 +-
.../mapreduce/AccumuloInputFormatTest.java | 71 ++++----
.../mapreduce/AccumuloRowInputFormatTest.java | 8 +-
.../lib/partition/RangePartitionerTest.java | 5 +-
.../helloworld/InsertWithOutputFormat.java | 10 +-
.../simple/filedata/ChunkInputFormatTest.java | 16 +-
.../server/master/CoordinateRecoveryTask.java | 5 +-
.../apache/accumulo/server/master/LogSort.java | 2 +-
9 files changed, 224 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
new file mode 100644
index 0000000..67819da
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A factory to allow applications to deal with inconsistencies between MapReduce Context Objects API between hadoop-0.20 and later versions. This code is based
+ * on org.apache.hadoop.mapreduce.ContextFactory in hadoop-mapred-0.22.0.
+ */
+public class ContextFactory {
+
+ private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> TASK_ID_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
+ private static final Class<?> TASK_TYPE_CLASS;
+ private static final boolean useV21;
+
+ static {
+ boolean v21 = true;
+ final String PACKAGE = "org.apache.hadoop.mapreduce";
+ try {
+ Class.forName(PACKAGE + ".task.JobContextImpl");
+ } catch (ClassNotFoundException cnfe) {
+ v21 = false;
+ }
+ useV21 = v21;
+ Class<?> jobContextCls;
+ Class<?> taskContextCls;
+ Class<?> mapCls;
+ Class<?> mapContextCls;
+ Class<?> innerMapContextCls;
+ try {
+ if (v21) {
+ jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl");
+ taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl");
+ TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType");
+ mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
+ mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
+ innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context");
+ } else {
+ jobContextCls = Class.forName(PACKAGE + ".JobContext");
+ taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext");
+ TASK_TYPE_CLASS = null;
+ mapContextCls = Class.forName(PACKAGE + ".MapContext");
+ mapCls = Class.forName(PACKAGE + ".Mapper");
+ innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Can't find class", e);
+ }
+ try {
+ JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class);
+ JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class);
+ TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ if (useV21) {
+ TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, TASK_TYPE_CLASS, int.class, int.class);
+ TASK_ID_CONSTRUCTOR.setAccessible(true);
+ MAP_CONSTRUCTOR = mapCls.getConstructor();
+ MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, MapContext.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR = mapContextCls.getDeclaredConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+ OutputCommitter.class, StatusReporter.class, InputSplit.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
+ } else {
+ TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, boolean.class, int.class, int.class);
+ TASK_ID_CONSTRUCTOR.setAccessible(true);
+ MAP_CONSTRUCTOR = null;
+ MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+ OutputCommitter.class, StatusReporter.class, InputSplit.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
+ }
+ MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ } catch (SecurityException e) {
+ throw new IllegalArgumentException("Can't run constructor ", e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("Can't find constructor ", e);
+ }
+ }
+
+ public static JobContext createJobContext() {
+ return createJobContext(new Configuration());
+ }
+
+ public static JobContext createJobContext(Configuration conf) {
+ try {
+ return (JobContext) JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, new JobID("local", 0));
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ }
+ }
+
+ public static TaskAttemptContext createTaskAttemptContext(JobContext job) {
+ return createTaskAttemptContext(job.getConfiguration());
+ }
+
+ public static TaskAttemptContext createTaskAttemptContext(Configuration conf) {
+ try {
+ if (useV21)
+ return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf,
+ TASK_ID_CONSTRUCTOR.newInstance("local", 0, TASK_TYPE_CLASS.getEnumConstants()[0], 0, 0));
+ else
+ return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, TASK_ID_CONSTRUCTOR.newInstance("local", 0, true, 0, 0));
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader,
+ RecordWriter<K2,V2> writer, InputSplit split) {
+ try {
+ if (useV21) {
+ Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null, split);
+ return (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis);
+ } else {
+ return (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null,
+ split);
+ }
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
index 342455f..84dce27 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
@@ -26,13 +26,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -44,13 +42,13 @@ public class AccumuloFileOutputFormatTest {
@Before
public void setup() {
- job = new JobContext(new Configuration(), new JobID());
+ job = ContextFactory.createJobContext();
Path file = new Path(System.getenv("ACCUMULO_HOME") + "/target/");
f = new Path(file, "_temporary");
job.getConfiguration().set("mapred.output.dir", file.toString());
- tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ tac = ContextFactory.createTaskAttemptContext(job);
}
@After
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 7239b01..f2e2f2c 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -39,16 +39,15 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
@@ -67,7 +66,7 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testMaxVersions() throws IOException {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1);
int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration());
assertEquals(1, version);
@@ -81,7 +80,7 @@ public class AccumuloInputFormatTest {
*/
@Test(expected = IOException.class)
public void testMaxVersionsLessThan1() throws IOException {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0);
}
@@ -90,7 +89,7 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testNoMaxVersion() {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration()));
}
@@ -100,8 +99,8 @@ public class AccumuloInputFormatTest {
@SuppressWarnings("deprecation")
@Test
public void testSetIterator() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
+ JobContext job = ContextFactory.createJobContext();
+
AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
Configuration conf = job.getConfiguration();
String iterators = conf.get("AccumuloInputFormat.iterators");
@@ -110,8 +109,8 @@ public class AccumuloInputFormatTest {
@Test
public void testAddIterator() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
+ JobContext job = ContextFactory.createJobContext();
+
AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
@@ -192,8 +191,8 @@ public class AccumuloInputFormatTest {
@SuppressWarnings("deprecation")
@Test
public void testGetIteratorSettings() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
+ JobContext job = ContextFactory.createJobContext();
+
AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow");
AccumuloInputFormat.setIterator(job, 2, "org.apache.accumulo.core.iterators.VersioningIterator", "Versions");
AccumuloInputFormat.setIterator(job, 3, "org.apache.accumulo.core.iterators.CountingIterator", "Count");
@@ -227,7 +226,7 @@ public class AccumuloInputFormatTest {
@SuppressWarnings("deprecation")
@Test
public void testSetIteratorOption() {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue");
Configuration conf = job.getConfiguration();
@@ -241,8 +240,8 @@ public class AccumuloInputFormatTest {
@SuppressWarnings("deprecation")
@Test
public void testGetIteratorOption() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
+ JobContext job = ContextFactory.createJobContext();
+
AccumuloInputFormat.setIteratorOption(job, "iterator1", "key1", "value1");
AccumuloInputFormat.setIteratorOption(job, "iterator2", "key2", "value2");
AccumuloInputFormat.setIteratorOption(job, "iterator3", "key3", "value3");
@@ -272,8 +271,8 @@ public class AccumuloInputFormatTest {
@SuppressWarnings("deprecation")
@Test
public void testSetRegex() {
- JobContext job = new JobContext(new Configuration(), new JobID());
-
+ JobContext job = ContextFactory.createJobContext();
+
String regex = ">\"*%<>\'\\";
AccumuloInputFormat.setRegex(job, RegexType.ROW, regex);
@@ -329,10 +328,9 @@ public class AccumuloInputFormatTest {
Assert.assertEquals(new Authorizations(), risplit.getAuths());
Assert.assertEquals("testmapinstance", risplit.getInstanceName());
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
- RecordReader<Key,Value> reader = input.createRecordReader(split, attempt);
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
+ RecordReader<Key,Value> reader = input.createRecordReader(split, tac);
+ Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, reader, null, split);
reader.initialize(split, context);
mapper.run(context);
}
@@ -350,20 +348,21 @@ public class AccumuloInputFormatTest {
bw.addMutation(m);
}
bw.close();
-
- JobContext job = new JobContext(new Configuration(), new JobID());
+
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
AccumuloInputFormat input = new AccumuloInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
rr.initialize(ris, tac);
TestMapper mapper = new TestMapper();
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), tac.getTaskAttemptID(), rr, null, null, null, ris);
+ Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, rr, null, ris);
+ rr.initialize(ris, tac);
while (rr.nextKeyValue()) {
- mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
+ mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), (TestMapper.Context) context);
}
}
@@ -380,15 +379,15 @@ public class AccumuloInputFormatTest {
bw.addMutation(m);
}
bw.close();
-
- JobContext job = new JobContext(new Configuration(), new JobID());
+
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable3", new Authorizations());
AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
final String regex = ".*1.*";
AccumuloInputFormat.setRegex(job, RegexType.ROW, regex);
AccumuloInputFormat input = new AccumuloInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
rr.initialize(ris, tac);
@@ -401,7 +400,7 @@ public class AccumuloInputFormatTest {
@SuppressWarnings("deprecation")
@Test
public void testCorrectRangeInputSplits() throws Exception {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
String username = "user", table = "table", rowRegex = "row.*", colfRegex = "colf.*", colqRegex = "colq.*";
String valRegex = "val.*", instance = "instance";
@@ -485,10 +484,9 @@ public class AccumuloInputFormatTest {
RangeInputSplit emptySplit = new RangeInputSplit();
// Using an empty split should fall back to the information in the Job's Configuration
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
- RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
+ RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, tac);
+ Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, reader, null, emptySplit);
reader.initialize(emptySplit, context);
mapper.run(context);
}
@@ -524,10 +522,9 @@ public class AccumuloInputFormatTest {
emptySplit.setPassword("anythingelse".getBytes());
// Using an empty split should fall back to the information in the Job's Configuration
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
- RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, attempt);
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, emptySplit);
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
+ RecordReader<Key,Value> reader = input.createRecordReader(emptySplit, tac);
+ Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, reader, null, emptySplit);
reader.initialize(emptySplit, context);
mapper.run(context);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
index d9f9da0..5199352 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
@@ -34,14 +34,12 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.accumulo.core.util.PeekingIterator;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.Test;
public class AccumuloRowInputFormatTest {
@@ -99,12 +97,12 @@ public class AccumuloRowInputFormatTest {
insertList(bw, row3);
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloRowInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations());
AccumuloRowInputFormat.setMockInstance(job.getConfiguration(), "instance1");
AccumuloRowInputFormat crif = new AccumuloRowInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<Text,PeekingIterator<Entry<Key,Value>>> rr = crif.createRecordReader(ris, tac);
rr.initialize(ris, tac);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java
index 8b95c57..87059c1 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java
@@ -18,10 +18,9 @@ package org.apache.accumulo.core.client.mapreduce.lib.partition;
import static org.junit.Assert.assertTrue;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.junit.Test;
public class RangePartitionerTest {
@@ -53,7 +52,7 @@ public class RangePartitionerTest {
}
private RangePartitioner prepPartitioner(int numSubBins) {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
RangePartitioner.setNumSubBins(job, numSubBins);
RangePartitioner rp = new RangePartitioner();
rp.setConf(job.getConfiguration());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java
index 7583494..af03470 100644
--- a/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java
+++ b/src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java
@@ -20,13 +20,12 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -46,16 +45,15 @@ public class InsertWithOutputFormat extends Configured implements Tool {
}
Text tableName = new Text(args[4]);
Job job = new Job(getConf());
- Configuration conf = job.getConfiguration();
- AccumuloOutputFormat.setZooKeeperInstance(conf, args[0], args[1]);
- AccumuloOutputFormat.setOutputInfo(conf, args[2], args[3].getBytes(), true, null);
+ AccumuloOutputFormat.setZooKeeperInstance(job, args[0], args[1]);
+ AccumuloOutputFormat.setOutputInfo(job, args[3], args[4].getBytes(), true, null);
job.setOutputFormatClass(AccumuloOutputFormat.class);
// when running a mapreduce, you won't need to instantiate the output
// format and record writer
// mapreduce will do that for you, and you will just use
// output.collect(tableName, mutation)
- TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
+ TaskAttemptContext context = ContextFactory.createTaskAttemptContext(job);
RecordWriter<Text,Mutation> rw = new AccumuloOutputFormat().getRecordWriter(context);
Text colf = new Text("colfam");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
index af12302..8937048 100644
--- a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
+++ b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
@@ -37,12 +37,10 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.log4j.Logger;
public class ChunkInputFormatTest extends TestCase {
@@ -87,12 +85,12 @@ public class ChunkInputFormatTest extends TestCase {
}
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance1");
ChunkInputFormat cif = new ChunkInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<List<Entry<Key,Value>>,InputStream> rr = cif.createRecordReader(ris, tac);
rr.initialize(ris, tac);
@@ -138,12 +136,12 @@ public class ChunkInputFormatTest extends TestCase {
}
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance2");
ChunkInputFormat cif = new ChunkInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
crr.initialize(ris, tac);
@@ -177,12 +175,12 @@ public class ChunkInputFormatTest extends TestCase {
}
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance3");
ChunkInputFormat cif = new ChunkInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
crr.initialize(ris, tac);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java b/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
index 64ed42e..b98f29d 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
@@ -257,10 +257,7 @@ public class CoordinateRecoveryTask implements Runnable {
return new RecoveryStatus(logFile.server, logFile.file, (sortJob == null ? 0. : sortJob.mapProgress()), (sortJob == null ? 0.
: sortJob.reduceProgress()), (int) (System.currentTimeMillis() - copyStartTime), (sortJob != null) ? 1. : (copySize == 0 ? 0 : copiedSoFar()
/ (double) copySize));
- } catch (InterruptedException ie) {
- // Hadoop 2.0.2-alpha's Job.mapProgress throws Interrupted Exception. 1.x and 2.0.4 do not.
- return new RecoveryStatus(logFile.server, logFile.file, 1.0, 1.0, (int) (System.currentTimeMillis() - copyStartTime), 1.0);
- } catch (NullPointerException npe) {
+ } catch (Exception e) {
return new RecoveryStatus(logFile.server, logFile.file, 1.0, 1.0, (int) (System.currentTimeMillis() - copyStartTime), 1.0);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a3264e4f/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
index 006d06e..4666331 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
@@ -31,9 +31,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;