You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bi...@apache.org on 2012/01/10 16:27:58 UTC
svn commit: r1229596 - in /incubator/accumulo/trunk/src:
core/src/main/java/org/apache/accumulo/core/util/
core/src/test/java/org/apache/accumulo/core/client/mapreduce/
core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/
example...
Author: billie
Date: Tue Jan 10 15:27:57 2012
New Revision: 1229596
URL: http://svn.apache.org/viewvc?rev=1229596&view=rev
Log:
ACCUMULO-286 introduced ContextFactory to assist with testing InputFormats and OutputFormats under different versions of hadoop
Added:
incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java (with props)
Modified:
incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java
incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithOutputFormat.java
incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatTest.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
Added: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java?rev=1229596&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java (added)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java Tue Jan 10 15:27:57 2012
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A factory to allow applications to deal with inconsistencies between MapReduce Context Objects API between hadoop-0.20 and later versions. This code is based
+ * on org.apache.hadoop.mapreduce.ContextFactory in hadoop-mapred-0.22.0.
+ */
+public class ContextFactory {
+
+ private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> TASK_ID_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
+ private static final Class<?> TASK_TYPE_CLASS;
+ private static final boolean useV21;
+
+ static {
+ boolean v21 = true;
+ final String PACKAGE = "org.apache.hadoop.mapreduce";
+ try {
+ Class.forName(PACKAGE + ".task.JobContextImpl");
+ } catch (ClassNotFoundException cnfe) {
+ v21 = false;
+ }
+ useV21 = v21;
+ Class<?> jobContextCls;
+ Class<?> taskContextCls;
+ Class<?> mapCls;
+ Class<?> mapContextCls;
+ Class<?> innerMapContextCls;
+ try {
+ if (v21) {
+ jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl");
+ taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl");
+ TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType");
+ mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
+ mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
+ innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context");
+ } else {
+ jobContextCls = Class.forName(PACKAGE + ".JobContext");
+ taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext");
+ TASK_TYPE_CLASS = null;
+ mapContextCls = Class.forName(PACKAGE + ".MapContext");
+ mapCls = Class.forName(PACKAGE + ".Mapper");
+ innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Can't find class", e);
+ }
+ try {
+ JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class);
+ JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class);
+ TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ if (useV21) {
+ TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, TASK_TYPE_CLASS, int.class, int.class);
+ TASK_ID_CONSTRUCTOR.setAccessible(true);
+ MAP_CONSTRUCTOR = mapCls.getConstructor();
+ MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, MapContext.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR = mapContextCls.getDeclaredConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+ OutputCommitter.class, StatusReporter.class, InputSplit.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
+ } else {
+ TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, boolean.class, int.class, int.class);
+ TASK_ID_CONSTRUCTOR.setAccessible(true);
+ MAP_CONSTRUCTOR = null;
+ MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+ OutputCommitter.class, StatusReporter.class, InputSplit.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
+ }
+ MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ } catch (SecurityException e) {
+ throw new IllegalArgumentException("Can't run constructor ", e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("Can't find constructor ", e);
+ }
+ }
+
+ public static JobContext createJobContext() {
+ return createJobContext(new Configuration());
+ }
+
+ public static JobContext createJobContext(Configuration conf) {
+ try {
+ return (JobContext) JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, new JobID("local", 0));
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ }
+ }
+
+ public static TaskAttemptContext createTaskAttemptContext(JobContext job) {
+ return createTaskAttemptContext(job.getConfiguration());
+ }
+
+ public static TaskAttemptContext createTaskAttemptContext(Configuration conf) {
+ try {
+ if (useV21)
+ return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf,
+ TASK_ID_CONSTRUCTOR.newInstance("local", 0, TASK_TYPE_CLASS.getEnumConstants()[0], 0, 0));
+ else
+ return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, TASK_ID_CONSTRUCTOR.newInstance("local", 0, true, 0, 0));
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader,
+ RecordWriter<K2,V2> writer, InputSplit split) {
+ try {
+ if (useV21) {
+ Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null, split);
+ return (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis);
+ } else {
+ return (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, null, null,
+ split);
+ }
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ }
+ }
+}
Propchange: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java Tue Jan 10 15:27:57 2012
@@ -26,13 +26,11 @@ import org.apache.accumulo.core.conf.Acc
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -44,13 +42,13 @@ public class AccumuloFileOutputFormatTes
@Before
public void setup() {
- job = new JobContext(new Configuration(), new JobID());
+ job = ContextFactory.createJobContext();
Path file = new Path(System.getenv("ACCUMULO_HOME") + "/target/");
f = new Path(file, "_temporary");
job.getConfiguration().set("mapred.output.dir", file.toString());
- tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ tac = ContextFactory.createTaskAttemptContext(job);
}
@After
Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java Tue Jan 10 15:27:57 2012
@@ -35,16 +35,15 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.After;
import org.junit.Test;
@@ -61,7 +60,7 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testMaxVersions() throws IOException {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1);
int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration());
assertEquals(1, version);
@@ -75,7 +74,7 @@ public class AccumuloInputFormatTest {
*/
@Test(expected = IOException.class)
public void testMaxVersionsLessThan1() throws IOException {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0);
}
@@ -84,7 +83,7 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testNoMaxVersion() {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConfiguration()));
}
@@ -93,7 +92,7 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testSetIterator() {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
Configuration conf = job.getConfiguration();
@@ -103,7 +102,7 @@ public class AccumuloInputFormatTest {
@Test
public void testAddIterator() {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
@@ -184,7 +183,7 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testGetIteratorSettings() {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
AccumuloInputFormat.addIterator(job.getConfiguration(), new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
@@ -215,7 +214,7 @@ public class AccumuloInputFormatTest {
@Test
public void testSetRegex() {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
String regex = ">\"*%<>\'\\";
@@ -267,10 +266,9 @@ public class AccumuloInputFormatTest {
TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
for (InputSplit split : splits) {
- TaskAttemptID id = new TaskAttemptID();
- TaskAttemptContext attempt = new TaskAttemptContext(job.getConfiguration(), id);
- RecordReader<Key,Value> reader = input.createRecordReader(split, attempt);
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), id, reader, null, null, null, split);
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
+ RecordReader<Key,Value> reader = input.createRecordReader(split, tac);
+ Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, reader, null, split);
reader.initialize(split, context);
mapper.run(context);
}
@@ -289,19 +287,20 @@ public class AccumuloInputFormatTest {
}
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2", new Authorizations());
AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
AccumuloInputFormat input = new AccumuloInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
rr.initialize(ris, tac);
TestMapper mapper = new TestMapper();
- Mapper<Key,Value,Key,Value>.Context context = mapper.new Context(job.getConfiguration(), tac.getTaskAttemptID(), rr, null, null, null, ris);
+ Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper, tac, rr, null, ris);
+ rr.initialize(ris, tac);
while (rr.nextKeyValue()) {
- mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), context);
+ mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), (TestMapper.Context) context);
}
}
}
Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java Tue Jan 10 15:27:57 2012
@@ -35,14 +35,12 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.accumulo.core.util.PeekingIterator;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.Test;
public class AccumuloRowInputFormatTest {
@@ -100,12 +98,12 @@ public class AccumuloRowInputFormatTest
insertList(bw, row3);
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
AccumuloRowInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations());
AccumuloRowInputFormat.setMockInstance(job.getConfiguration(), "instance1");
AccumuloRowInputFormat crif = new AccumuloRowInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<Text,PeekingIterator<Entry<Key,Value>>> rr = crif.createRecordReader(ris, tac);
rr.initialize(ris, tac);
Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitionerTest.java Tue Jan 10 15:27:57 2012
@@ -18,11 +18,9 @@ package org.apache.accumulo.core.client.
import static org.junit.Assert.assertTrue;
-import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.junit.Test;
public class RangePartitionerTest {
@@ -54,7 +52,7 @@ public class RangePartitionerTest {
}
private RangePartitioner prepPartitioner(int numSubBins) {
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
RangePartitioner.setNumSubBins(job, numSubBins);
RangePartitioner rp = new RangePartitioner();
rp.setConf(job.getConfiguration());
Modified: incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithOutputFormat.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithOutputFormat.java (original)
+++ incubator/accumulo/trunk/src/examples/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithOutputFormat.java Tue Jan 10 15:27:57 2012
@@ -20,13 +20,12 @@ import org.apache.accumulo.core.client.m
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -43,7 +42,6 @@ public class InsertWithOutputFormat exte
}
Text tableName = new Text(args[2]);
Job job = new Job(getConf());
- Configuration conf = job.getConfiguration();
AccumuloOutputFormat.setZooKeeperInstance(job, args[0], args[1]);
AccumuloOutputFormat.setOutputInfo(job, args[3], args[4].getBytes(), true, null);
job.setOutputFormatClass(AccumuloOutputFormat.class);
@@ -52,7 +50,7 @@ public class InsertWithOutputFormat exte
// format and record writer
// mapreduce will do that for you, and you will just use
// output.collect(tableName, mutation)
- TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
+ TaskAttemptContext context = ContextFactory.createTaskAttemptContext(job);
RecordWriter<Text,Mutation> rw = new AccumuloOutputFormat().getRecordWriter(context);
Text colf = new Text("colfam");
Modified: incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatTest.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatTest.java (original)
+++ incubator/accumulo/trunk/src/examples/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatTest.java Tue Jan 10 15:27:57 2012
@@ -37,12 +37,10 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.core.util.ContextFactory;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.log4j.Logger;
public class ChunkInputFormatTest extends TestCase {
@@ -87,12 +85,12 @@ public class ChunkInputFormatTest extend
}
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance1");
ChunkInputFormat cif = new ChunkInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<List<Entry<Key,Value>>,InputStream> rr = cif.createRecordReader(ris, tac);
rr.initialize(ris, tac);
@@ -138,12 +136,12 @@ public class ChunkInputFormatTest extend
}
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance2");
ChunkInputFormat cif = new ChunkInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
crr.initialize(ris, tac);
@@ -177,12 +175,12 @@ public class ChunkInputFormatTest extend
}
bw.close();
- JobContext job = new JobContext(new Configuration(), new JobID());
+ JobContext job = ContextFactory.createJobContext();
ChunkInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance3");
ChunkInputFormat cif = new ChunkInputFormat();
RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
RecordReader<List<Entry<Key,Value>>,InputStream> crr = cif.createRecordReader(ris, tac);
crr.initialize(ris, tac);
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/CoordinateRecoveryTask.java Tue Jan 10 15:27:57 2012
@@ -241,7 +241,7 @@ public class CoordinateRecoveryTask impl
return new RecoveryStatus(logFile.server, logFile.file, (sortJob == null ? 0. : sortJob.mapProgress()), (sortJob == null ? 0.
: sortJob.reduceProgress()), (int) (System.currentTimeMillis() - copyStartTime), (sortJob != null) ? 1. : (copySize == 0 ? 0 : copiedSoFar()
/ (double) copySize));
- } catch (NullPointerException npe) {
+ } catch (Exception e) {
return new RecoveryStatus(logFile.server, logFile.file, 1.0, 1.0, (int) (System.currentTimeMillis() - copyStartTime), 1.0);
}
}
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java?rev=1229596&r1=1229595&r2=1229596&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/LogSort.java Tue Jan 10 15:27:57 2012
@@ -31,9 +31,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
@@ -81,8 +81,8 @@ public class LogSort extends Configured
@Override
public void abortTask(TaskAttemptContext context) {
- super.abortTask(context);
try {
+ super.abortTask(context);
outputFileSystem.delete(outputPath, true);
} catch (IOException ex) {
throw new RuntimeException(ex);