You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2012/12/13 00:46:49 UTC

svn commit: r1421044 - in /accumulo/trunk/core/src: main/java/org/apache/accumulo/core/client/mapreduce/ main/java/org/apache/accumulo/core/file/rfile/ main/java/org/apache/accumulo/core/util/ test/java/org/apache/accumulo/core/client/mapreduce/

Author: ctubbsii
Date: Wed Dec 12 23:46:48 2012
New Revision: 1421044

URL: http://svn.apache.org/viewvc?rev=1421044&view=rev
Log:
ACCUMULO-467 Change the behavior of AccumuloFileOutputFormat to carry Accumulo properties in an AccumuloConfiguration object, to remove the side-effect behavior of RFileOperations permitting Hadoop configuration to override AccumuloConfiguration in all cases.
ACCUMULO-769 The new methods that were added were done so in a way that is consistent with Hadoop's context-oriented MapReduce framework.

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java Wed Dec 12 23:46:48 2012
@@ -17,88 +17,232 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
- * This class allows MapReduce jobs to use the Accumulo data file format for output of data
+ * This class allows MapReduce jobs to write output in the Accumulo data file format.<br />
+ * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important requirement of Accumulo data files.
  * 
- * The user must specify the output path that does not exist following via static method calls to this class:
- * 
- * AccumuloFileOutputFormat.setOutputPath(job, outputDirectory)
- * 
- * Other methods from FileOutputFormat to configure options are ignored Compression is using the DefaultCodec and is always on
+ * <p>
+ * The output path to be created must be specified via {@link AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from
+ * {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using
+ * other Hadoop configuration options that affect the behavior of the underlying files directly in the Job's configuration may work, but are not directly
+ * supported at this time.
  */
 public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
-  private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName();
-  public static final String FILE_TYPE = PREFIX + ".file_type";
+  private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName() + ".";
+  private static final String ACCUMULO_PROPERTY_PREFIX = PREFIX + "accumuloProperties.";
+  
+  /**
+   * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been
+   * stored in the Job's configuration
+   * 
+   * @since 1.5.0
+   */
+  protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) {
+    ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
+    for (Entry<String,String> entry : context.getConfiguration())
+      if (entry.getKey().startsWith(ACCUMULO_PROPERTY_PREFIX))
+        acuConf.set(Property.getPropertyByKey(entry.getKey().substring(ACCUMULO_PROPERTY_PREFIX.length())), entry.getValue());
+    return acuConf;
+  }
   
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
-  private static final String INSTANCE_NAME = PREFIX + ".instanceName";
-  private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
+  /**
+   * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.<br />
+   * These properties correspond to the supported public static setter methods available to this class.
+   * 
+   * @since 1.5.0
+   */
+  protected static boolean isSupportedAccumuloProperty(Property property) {
+    switch (property) {
+      case TABLE_FILE_COMPRESSION_TYPE:
+      case TABLE_FILE_COMPRESSED_BLOCK_SIZE:
+      case TABLE_FILE_BLOCK_SIZE:
+      case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX:
+      case TABLE_FILE_REPLICATION:
+        return true;
+      default:
+        return false;
+    }
+  }
+  
+  /**
+   * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration.
+   * 
+   * @since 1.5.0
+   */
+  protected static <T> void setAccumuloProperty(Job job, Property property, T value) {
+    if (isSupportedAccumuloProperty(property)) {
+      String val = String.valueOf(value);
+      if (property.getType().isValidFormat(val))
+        job.getConfiguration().set(ACCUMULO_PROPERTY_PREFIX + property.getKey(), val);
+      else
+        throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'");
+    } else
+      throw new IllegalArgumentException("Unsupported configuration property " + property.getKey());
+  }
+  
+  /**
+   * @param compressionType
+   *          The type of compression to use. One of "none", "gz", "lzo", or "snappy". Specifying a compression may require additional libraries to be available
+   *          to your Job.
+   * @since 1.5.0
+   */
+  public static void setCompressionType(Job job, String compressionType) {
+    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType);
+  }
+  
+  /**
+   * Sets the size for data blocks within each file.<br />
+   * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group.
+   * 
+   * <p>
+   * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance).
+   * 
+   * @since 1.5.0
+   */
+  public static void setDataBlockSize(Job job, long dataBlockSize) {
+    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize);
+  }
+  
+  /**
+   * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system
+   * 
+   * @since 1.5.0
+   */
+  public static void setFileBlockSize(Job job, long fileBlockSize) {
+    setAccumuloProperty(job, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize);
+  }
+  
+  /**
+   * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow
+   * index hierarchy within the file. This can affect the performance of queries.
+   * 
+   * @since 1.5.0
+   */
+  public static void setIndexBlockSize(Job job, long indexBlockSize) {
+    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize);
+  }
+  
+  /**
+   * Sets the file system replication factor for the resulting file, overriding the file system default.
+   * 
+   * @since 1.5.0
+   */
+  public static void setReplication(Job job, int replication) {
+    setAccumuloProperty(job, Property.TABLE_FILE_REPLICATION, replication);
+  }
   
   @Override
-  public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
+  public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException {
     // get the path of the temporary output file
-    final Configuration conf = job.getConfiguration();
+    final Configuration conf = context.getConfiguration();
+    final AccumuloConfiguration acuConf = getAccumuloConfiguration(context);
     
-    String extension = conf.get(FILE_TYPE);
-    if (extension == null || extension.isEmpty())
-      extension = RFile.EXTENSION;
-    
-    final Path file = this.getDefaultWorkFile(job, "." + extension);
+    final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
+    final Path file = this.getDefaultWorkFile(context, "." + extension);
     
     return new RecordWriter<Key,Value>() {
       FileSKVWriter out = null;
       
       @Override
+      public void close(TaskAttemptContext context) throws IOException {
+        if (out != null)
+          out.close();
+      }
+      
+      @Override
       public void write(Key key, Value value) throws IOException {
         if (out == null) {
-          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, AccumuloConfiguration.getDefaultConfiguration());
+          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);
       }
-      
-      @Override
-      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-        if (out != null)
-          out.close();
-      }
     };
   }
   
-  public static void setFileType(Configuration conf, String type) {
-    conf.set(FILE_TYPE, type);
+  // ----------------------------------------------------------------------------------------------------
+  // Everything below this line is deprecated and should go away in future versions
+  // ----------------------------------------------------------------------------------------------------
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @SuppressWarnings("unused")
+  @Deprecated
+  private static final String FILE_TYPE = PREFIX + "file_type";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @SuppressWarnings("unused")
+  @Deprecated
+  private static final String BLOCK_SIZE = PREFIX + "block_size";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @Deprecated
+  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + "instanceConfigured";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @Deprecated
+  private static final String INSTANCE_NAME = PREFIX + "instanceName";
+  
+  /**
+   * @deprecated since 1.5.0;
+   */
+  @Deprecated
+  private static final String ZOOKEEPERS = PREFIX + "zooKeepers";
+  
+  /**
+   * @deprecated since 1.5.0; Retrieve the relevant block size from {@link #getAccumuloConfiguration(JobContext)}
+   */
+  @Deprecated
+  protected static void handleBlockSize(Configuration conf) {
+    conf.setInt("io.seqfile.compress.blocksize",
+        (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
   }
   
   /**
-   * @deprecated since 1.5, use {@link #setCompressedBlockSize(Configuration, long)} instead
+   * @deprecated since 1.5.0; This method does nothing. Only 'rf' type is supported.
+   */
+  @Deprecated
+  public static void setFileType(Configuration conf, String type) {}
+  
+  /**
+   * @deprecated since 1.5.0; Use {@link #setFileBlockSize(Job, long)}, {@link #setDataBlockSize(Job, long)}, or {@link #setIndexBlockSize(Job, long)} instead.
    */
+  @Deprecated
   public static void setBlockSize(Configuration conf, int blockSize) {
-    long bs = blockSize;
-    setCompressedBlockSize(conf, bs);
+    conf.set(ACCUMULO_PROPERTY_PREFIX + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), String.valueOf(blockSize));
   }
   
   /**
-   * @param conf
-   * @param instanceName
-   * @param zooKeepers
+   * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration.
    */
+  @Deprecated
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) {
     if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
       throw new IllegalStateException("Instance info can only be set once per job");
@@ -110,30 +254,11 @@ public class AccumuloFileOutputFormat ex
   }
   
   /**
-   * @param conf
-   * @return The Accumulo instance.
+   * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration.
    */
+  @Deprecated
   protected static Instance getInstance(Configuration conf) {
     return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS));
   }
   
-  public static void setReplication(Configuration conf, int replication) {
-    conf.setInt(Property.TABLE_FILE_REPLICATION.getKey(), replication);
-  }
-  
-  public static void setDFSBlockSize(Configuration conf, long blockSize) {
-    conf.setLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), blockSize);
-  }
-  
-  public static void setCompressedBlockSize(Configuration conf, long cblockSize) {
-    conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), cblockSize);
-  }
-  
-  public static void setCompressedBlockSizeIndex(Configuration conf, long cblockSizeIndex) {
-    conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), cblockSizeIndex);
-  }
-  
-  public static void setCompressionType(Configuration conf, String compression) {
-    conf.set(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), compression);
-  }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java Wed Dec 12 23:46:48 2012
@@ -104,23 +104,22 @@ public class RFileOperations extends Fil
   @Override
   public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
     int hrep = conf.getInt("dfs.replication", -1);
-    int trep = conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
     int rep = hrep;
     if (trep > 0 && trep != hrep) {
       rep = trep;
     }
     long hblock = conf.getLong("dfs.block.size", 1 << 26);
-    long tblock = conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
     long block = hblock;
     if (tblock > 0)
       block = tblock;
     int bufferSize = conf.getInt("io.file.buffer.size", 4096);
     
-    long blockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
-    long indexBlockSize = conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(),
-        acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
+    long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
     
-    String compression = conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+    String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
     
     CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf);
     Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java Wed Dec 12 23:46:48 2012
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTarge
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MapContext;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.TaskA
  */
 public class ContextFactory {
   
+  private static final Constructor<?> JOB_CONSTRUCTOR;
   private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
   private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
   private static final Constructor<?> TASK_ID_CONSTRUCTOR;
@@ -58,6 +60,7 @@ public class ContextFactory {
       v21 = false;
     }
     useV21 = v21;
+    Class<?> jobCls;
     Class<?> jobContextCls;
     Class<?> taskContextCls;
     Class<?> mapCls;
@@ -65,6 +68,7 @@ public class ContextFactory {
     Class<?> innerMapContextCls;
     try {
       if (v21) {
+        jobCls = Class.forName(PACKAGE + ".Job");
         jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl");
         taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl");
         TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType");
@@ -72,6 +76,7 @@ public class ContextFactory {
         mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
         innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context");
       } else {
+        jobCls = Class.forName(PACKAGE + ".Job");
         jobContextCls = Class.forName(PACKAGE + ".JobContext");
         taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext");
         TASK_TYPE_CLASS = null;
@@ -83,6 +88,7 @@ public class ContextFactory {
       throw new IllegalArgumentException("Can't find class", e);
     }
     try {
+      JOB_CONSTRUCTOR = jobCls.getConstructor(Configuration.class, String.class);
       JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class);
       JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
       TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class);
@@ -111,6 +117,22 @@ public class ContextFactory {
     }
   }
   
+  public static Job createJob() {
+    return createJob(new Configuration());
+  }
+  
+  public static Job createJob(Configuration conf) {
+    try {
+      return (Job) JOB_CONSTRUCTOR.newInstance(conf, new JobID("local", 0).toString());
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    }
+  }
+  
   public static JobContext createJobContext() {
     return createJobContext(new Configuration());
   }
@@ -152,16 +174,19 @@ public class ContextFactory {
     return createMapContext(m, tac, reader, writer, null, null, split);
   }
   
-  @SuppressWarnings({"unchecked", "rawtypes"})
   public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader,
       RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {
     try {
       if (useV21) {
         Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split);
-        return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis);
+        @SuppressWarnings("unchecked")
+        Mapper<K1,V1,K2,V2>.Context newInstance = (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(MAP_CONSTRUCTOR.newInstance(), basis);
+        return newInstance;
       } else {
-        return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter,
-            split);
+        @SuppressWarnings("unchecked")
+        Mapper<K1,V1,K2,V2>.Context newInstance = (Mapper<K1,V1,K2,V2>.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(),
+            tac.getTaskAttemptID(), reader, writer, committer, reporter, split);
+        return newInstance;
       }
     } catch (InstantiationException e) {
       throw new IllegalArgumentException("Can't create object", e);

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1421044&r1=1421043&r2=1421044&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java Wed Dec 12 23:46:48 2012
@@ -27,9 +27,8 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.ContextFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.After;
@@ -37,13 +36,13 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloFileOutputFormatTest {
-  static JobContext job;
+  static Job job;
   static TaskAttemptContext tac;
   static Path f = null;
   
   @Before
   public void setup() {
-    job = ContextFactory.createJobContext();
+    job = ContextFactory.createJob();
     
     Path file = new Path("target/");
     f = new Path(file, "_temporary");
@@ -89,8 +88,6 @@ public class AccumuloFileOutputFormatTes
   
   @Test
   public void validateConfiguration() throws IOException, InterruptedException {
-    Configuration conf = job.getConfiguration();
-    AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
     
     int a = 7;
     long b = 300l;
@@ -98,17 +95,18 @@ public class AccumuloFileOutputFormatTes
     long d = 10l;
     String e = "type";
     
-    AccumuloFileOutputFormat.setReplication(conf, a);
-    AccumuloFileOutputFormat.setDFSBlockSize(conf, b);
-    AccumuloFileOutputFormat.setCompressedBlockSize(conf, c);
-    AccumuloFileOutputFormat.setCompressedBlockSizeIndex(conf, d);
-    AccumuloFileOutputFormat.setCompressionType(conf, e);
-    
-    assertEquals(a, conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(), acuconf.getCount(Property.TABLE_FILE_REPLICATION)));
-    assertEquals(b, conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)));
-    assertEquals(c, conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)));
-    assertEquals(d,
-        conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)));
-    assertEquals(e, conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)));
+    AccumuloFileOutputFormat.setReplication(job, a);
+    AccumuloFileOutputFormat.setFileBlockSize(job, b);
+    AccumuloFileOutputFormat.setDataBlockSize(job, c);
+    AccumuloFileOutputFormat.setIndexBlockSize(job, d);
+    AccumuloFileOutputFormat.setCompressionType(job, e);
+    
+    AccumuloConfiguration acuconf = AccumuloFileOutputFormat.getAccumuloConfiguration(job);
+    
+    assertEquals(a, acuconf.getCount(Property.TABLE_FILE_REPLICATION));
+    assertEquals(b, acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
+    assertEquals(c, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+    assertEquals(d, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    assertEquals(e, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
   }
 }