You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2012/12/13 00:46:49 UTC
svn commit: r1421044 - in /accumulo/trunk/core/src:
main/java/org/apache/accumulo/core/client/mapreduce/
main/java/org/apache/accumulo/core/file/rfile/
main/java/org/apache/accumulo/core/util/
test/java/org/apache/accumulo/core/client/mapreduce/
Author: ctubbsii
Date: Wed Dec 12 23:46:48 2012
New Revision: 1421044
URL: http://svn.apache.org/viewvc?rev=1421044&view=rev
Log:
ACCUMULO-467 Change the behavior of AccumuloFileOutputFormat to carry Accumulo properties in an AccumuloConfiguration object, to remove the side-effect behavior of RFileOperations permitting Hadoop configuration to override AccumuloConfiguration in all cases.
ACCUMULO-769 The new methods that were added were done so in a way that is consistent with Hadoop's context-oriented MapReduce framework.
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java Wed Dec 12 23:46:48 2012
@@ -17,88 +17,232 @@
package org.apache.accumulo.core.client.mapreduce;
import java.io.IOException;
+import java.util.Map.Entry;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
- * This class allows MapReduce jobs to use the Accumulo data file format for output of data
+ * This class allows MapReduce jobs to write output in the Accumulo data file format.<br />
+ * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important requirement of Accumulo data files.
*
- * The user must specify the output path that does not exist following via static method calls to this class:
- *
- * AccumuloFileOutputFormat.setOutputPath(job, outputDirectory)
- *
- * Other methods from FileOutputFormat to configure options are ignored Compression is using the DefaultCodec and is always on
+ * <p>
+ * The output path to be created must be specified via {@link AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from
+ * {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using
+ * other Hadoop configuration options that affect the behavior of the underlying files directly in the Job's configuration may work, but are not directly
+ * supported at this time.
*/
public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
- private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName();
- public static final String FILE_TYPE = PREFIX + ".file_type";
+ private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName() + ".";
+ private static final String ACCUMULO_PROPERTY_PREFIX = PREFIX + "accumuloProperties.";
+
+ /**
+ * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been
+ * stored in the Job's configuration
+ *
+ * @since 1.5.0
+ */
+ protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) {
+ ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
+ for (Entry<String,String> entry : context.getConfiguration())
+ if (entry.getKey().startsWith(ACCUMULO_PROPERTY_PREFIX))
+ acuConf.set(Property.getPropertyByKey(entry.getKey().substring(ACCUMULO_PROPERTY_PREFIX.length())), entry.getValue());
+ return acuConf;
+ }
- private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
- private static final String INSTANCE_NAME = PREFIX + ".instanceName";
- private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
+ /**
+ * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.<br />
+ * These properties correspond to the supported public static setter methods available to this class.
+ *
+ * @since 1.5.0
+ */
+ protected static boolean isSupportedAccumuloProperty(Property property) {
+ switch (property) {
+ case TABLE_FILE_COMPRESSION_TYPE:
+ case TABLE_FILE_COMPRESSED_BLOCK_SIZE:
+ case TABLE_FILE_BLOCK_SIZE:
+ case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX:
+ case TABLE_FILE_REPLICATION:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration.
+ *
+ * @since 1.5.0
+ */
+ protected static <T> void setAccumuloProperty(Job job, Property property, T value) {
+ if (isSupportedAccumuloProperty(property)) {
+ String val = String.valueOf(value);
+ if (property.getType().isValidFormat(val))
+ job.getConfiguration().set(ACCUMULO_PROPERTY_PREFIX + property.getKey(), val);
+ else
+ throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'");
+ } else
+ throw new IllegalArgumentException("Unsupported configuration property " + property.getKey());
+ }
+
+ /**
+ * @param compressionType
+ * The type of compression to use. One of "none", "gz", "lzo", or "snappy". Specifying a compression may require additional libraries to be available
+ * to your Job.
+ * @since 1.5.0
+ */
+ public static void setCompressionType(Job job, String compressionType) {
+ setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType);
+ }
+
+ /**
+ * Sets the size for data blocks within each file.<br />
+ * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group.
+ *
+ * <p>
+ * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance).
+ *
+ * @since 1.5.0
+ */
+ public static void setDataBlockSize(Job job, long dataBlockSize) {
+ setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize);
+ }
+
+ /**
+ * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system
+ *
+ * @since 1.5.0
+ */
+ public static void setFileBlockSize(Job job, long fileBlockSize) {
+ setAccumuloProperty(job, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize);
+ }
+
+ /**
+ * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow
+ * index hierarchy within the file. This can affect the performance of queries.
+ *
+ * @since 1.5.0
+ */
+ public static void setIndexBlockSize(Job job, long indexBlockSize) {
+ setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize);
+ }
+
+ /**
+ * Sets the file system replication factor for the resulting file, overriding the file system default.
+ *
+ * @since 1.5.0
+ */
+ public static void setReplication(Job job, int replication) {
+ setAccumuloProperty(job, Property.TABLE_FILE_REPLICATION, replication);
+ }
@Override
- public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
+ public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException {
// get the path of the temporary output file
- final Configuration conf = job.getConfiguration();
+ final Configuration conf = context.getConfiguration();
+ final AccumuloConfiguration acuConf = getAccumuloConfiguration(context);
- String extension = conf.get(FILE_TYPE);
- if (extension == null || extension.isEmpty())
- extension = RFile.EXTENSION;
-
- final Path file = this.getDefaultWorkFile(job, "." + extension);
+ final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
+ final Path file = this.getDefaultWorkFile(context, "." + extension);
return new RecordWriter<Key,Value>() {
FileSKVWriter out = null;
@Override
+ public void close(TaskAttemptContext context) throws IOException {
+ if (out != null)
+ out.close();
+ }
+
+ @Override
public void write(Key key, Value value) throws IOException {
if (out == null) {
- out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, AccumuloConfiguration.getDefaultConfiguration());
+ out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
out.startDefaultLocalityGroup();
}
out.append(key, value);
}
-
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- if (out != null)
- out.close();
- }
};
}
- public static void setFileType(Configuration conf, String type) {
- conf.set(FILE_TYPE, type);
+ // ----------------------------------------------------------------------------------------------------
+ // Everything below this line is deprecated and should go away in future versions
+ // ----------------------------------------------------------------------------------------------------
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @SuppressWarnings("unused")
+ @Deprecated
+ private static final String FILE_TYPE = PREFIX + "file_type";
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @SuppressWarnings("unused")
+ @Deprecated
+ private static final String BLOCK_SIZE = PREFIX + "block_size";
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @Deprecated
+ private static final String INSTANCE_HAS_BEEN_SET = PREFIX + "instanceConfigured";
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @Deprecated
+ private static final String INSTANCE_NAME = PREFIX + "instanceName";
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @Deprecated
+ private static final String ZOOKEEPERS = PREFIX + "zooKeepers";
+
+ /**
+ * @deprecated since 1.5.0; Retrieve the relevant block size from {@link #getAccumuloConfiguration(JobContext)}
+ */
+ @Deprecated
+ protected static void handleBlockSize(Configuration conf) {
+ conf.setInt("io.seqfile.compress.blocksize",
+ (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
}
/**
- * @deprecated since 1.5, use {@link #setCompressedBlockSize(Configuration, long)} instead
+ * @deprecated since 1.5.0; This method does nothing. Only 'rf' type is supported.
+ */
+ @Deprecated
+ public static void setFileType(Configuration conf, String type) {}
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setFileBlockSize(Job, long)}, {@link #setDataBlockSize(Job, long)}, or {@link #setIndexBlockSize(Job, long)} instead.
*/
+ @Deprecated
public static void setBlockSize(Configuration conf, int blockSize) {
- long bs = blockSize;
- setCompressedBlockSize(conf, bs);
+ conf.set(ACCUMULO_PROPERTY_PREFIX + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), String.valueOf(blockSize));
}
/**
- * @param conf
- * @param instanceName
- * @param zooKeepers
+ * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration.
*/
+ @Deprecated
public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
throw new IllegalStateException("Instance info can only be set once per job");
@@ -110,30 +254,11 @@ public class AccumuloFileOutputFormat ex
}
/**
- * @param conf
- * @return The Accumulo instance.
+ * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration.
*/
+ @Deprecated
protected static Instance getInstance(Configuration conf) {
return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
}
- public static void setReplication(Configuration conf, int replication) {
- conf.setInt(Property.TABLE_FILE_REPLICATION.getKey(), replication);
- }
-
- public static void setDFSBlockSize(Configuration conf, long blockSize) {
- conf.setLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), blockSize);
- }
-
- public static void setCompressedBlockSize(Configuration conf, long cblockSize) {
- conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), cblockSize);
- }
-
- public static void setCompressedBlockSizeIndex(Configuration conf, long cblockSizeIndex) {
- conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), cblockSizeIndex);
- }
-
- public static void setCompressionType(Configuration conf, String compression) {
- conf.set(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), compression);
- }
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java Wed Dec 12 23:46:48 2012
@@ -104,23 +104,22 @@ public class RFileOperations extends Fil
@Override
public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
int hrep = conf.getInt("dfs.replication", -1);
- int trep = conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+ int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
int rep = hrep;
if (trep > 0 && trep != hrep) {
rep = trep;
}
long hblock = conf.getLong("dfs.block.size", 1 << 26);
- long tblock = conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+ long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
long block = hblock;
if (tblock > 0)
block = tblock;
int bufferSize = conf.getInt("io.file.buffer.size", 4096);
- long blockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
- long indexBlockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(),
- acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+ long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
+ long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
- String compression = conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+ String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf);
Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java Wed Dec 12 23:46:48 2012
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTarge
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MapContext;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.TaskA
*/
public class ContextFactory {
+ private static final Constructor<?> JOB_CONSTRUCTOR;
private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
private static final Constructor<?> TASK_ID_CONSTRUCTOR;
@@ -58,6 +60,7 @@ public class ContextFactory {
v21 = false;
}
useV21 = v21;
+ Class<?> jobCls;
Class<?> jobContextCls;
Class<?> taskContextCls;
Class<?> mapCls;
@@ -65,6 +68,7 @@ public class ContextFactory {
Class<?> innerMapContextCls;
try {
if (v21) {
+ jobCls = Class.forName(PACKAGE + ".Job");
jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl");
taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl");
TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType");
@@ -72,6 +76,7 @@ public class ContextFactory {
mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context");
} else {
+ jobCls = Class.forName(PACKAGE + ".Job");
jobContextCls = Class.forName(PACKAGE + ".JobContext");
taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext");
TASK_TYPE_CLASS = null;
@@ -83,6 +88,7 @@ public class ContextFactory {
throw new IllegalArgumentException("Can't find class", e);
}
try {
+ JOB_CONSTRUCTOR = jobCls.getConstructor(Configuration.class, String.class);
JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class);
JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class);
@@ -111,6 +117,22 @@ public class ContextFactory {
}
}
+ public static Job createJob() {
+ return createJob(new Configuration());
+ }
+
+ public static Job createJob(Configuration conf) {
+ try {
+ return (Job) JOB_CONSTRUCTOR.newInstance(conf, new JobID("local", 0).toString());
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't create object", e);
+ }
+ }
+
public static JobContext createJobContext() {
return createJobContext(new Configuration());
}
@@ -152,16 +174,19 @@ public class ContextFactory {
return createMapContext(m, tac, reader, writer, null, null, split);
}
- @SuppressWarnings({"unchecked", "rawtypes"})
public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader,
RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {
try {
if (useV21) {
Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split);
- return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis);
+ @SuppressWarnings("unchecked")
+ Mapper<K1,V1,K2,V2>.Context newInstance = (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(MAP_CONSTRUCTOR.newInstance(), basis);
+ return newInstance;
} else {
- return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter,
- split);
+ @SuppressWarnings("unchecked")
+ Mapper<K1,V1,K2,V2>.Context newInstance = (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(),
+ tac.getTaskAttemptID(), reader, writer, committer, reporter, split);
+ return newInstance;
}
} catch (InstantiationException e) {
throw new IllegalArgumentException("Can't create object", e);
Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java Wed Dec 12 23:46:48 2012
@@ -27,9 +27,8 @@ import org.apache.accumulo.core.conf.Pro
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.ContextFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.After;
@@ -37,13 +36,13 @@ import org.junit.Before;
import org.junit.Test;
public class AccumuloFileOutputFormatTest {
- static JobContext job;
+ static Job job;
static TaskAttemptContext tac;
static Path f = null;
@Before
public void setup() {
- job = ContextFactory.createJobContext();
+ job = ContextFactory.createJob();
Path file = new Path("target/");
f = new Path(file, "_temporary");
@@ -89,8 +88,6 @@ public class AccumuloFileOutputFormatTes
@Test
public void validateConfiguration() throws IOException, InterruptedException {
- Configuration conf = job.getConfiguration();
- AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
int a = 7;
long b = 300l;
@@ -98,17 +95,18 @@ public class AccumuloFileOutputFormatTes
long d = 10l;
String e = "type";
- AccumuloFileOutputFormat.setReplication(conf, a);
- AccumuloFileOutputFormat.setDFSBlockSize(conf, b);
- AccumuloFileOutputFormat.setCompressedBlockSize(conf, c);
- AccumuloFileOutputFormat.setCompressedBlockSizeIndex(conf, d);
- AccumuloFileOutputFormat.setCompressionType(conf, e);
-
- assertEquals(a, conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION)));
- assertEquals(b, conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)));
- assertEquals(c, conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)));
- assertEquals(d,
- conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)));
- assertEquals(e, conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)));
+ AccumuloFileOutputFormat.setReplication(job, a);
+ AccumuloFileOutputFormat.setFileBlockSize(job, b);
+ AccumuloFileOutputFormat.setDataBlockSize(job, c);
+ AccumuloFileOutputFormat.setIndexBlockSize(job, d);
+ AccumuloFileOutputFormat.setCompressionType(job, e);
+
+ AccumuloConfiguration acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job);
+
+ assertEquals(a, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+ assertEquals(b, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+ assertEquals(c, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+ assertEquals(d, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+ assertEquals(e, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
}
}