You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [11/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java Thu Jan 21 10:37:58 2010
@@ -18,82 +18,85 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.io.IOException;
+import java.io.DataInputStream;
 import java.io.EOFException;
+import java.io.IOException;
 import java.io.InputStream;
-import java.io.DataInputStream;
 
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
-
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
-
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 
-/** An {@link org.apache.hadoop.mapred.InputFormat} for Plain files with {@link Deserializer} records */
-public class FlatFileInputFormat<T> extends FileInputFormat<Void, FlatFileInputFormat.RowContainer<T>> {
+/**
+ * An {@link org.apache.hadoop.mapred.InputFormat} for Plain files with
+ * {@link Deserializer} records
+ */
+public class FlatFileInputFormat<T> extends
+    FileInputFormat<Void, FlatFileInputFormat.RowContainer<T>> {
 
   /**
-   * A work-around until HADOOP-1230 is fixed. 
-   *
-   * Allows boolean next(k,v) to be called by reference but still allow the deserializer to create a new
-   * object (i.e., row) on every call to next.
+   * A work-around until HADOOP-1230 is fixed.
+   * 
+   * Allows boolean next(k,v) to be called by reference but still allow the
+   * deserializer to create a new object (i.e., row) on every call to next.
    */
   static public class RowContainer<T> {
     T row;
   }
 
   /**
-   * An implementation of SerializationContext is responsible for looking up the Serialization implementation
-   * for the given RecordReader. Potentially based on the Configuration or some other mechanism
-   *
-   * The SerializationFactory does not give this functionality since:
-   *  1. Requires Serialization implementations to be specified in the Configuration a-priori (although same as setting
-   *     a SerializationContext)
-   *  2. Does not lookup the actual subclass being deserialized. e.g., for Serializable does not have a way of  configuring
-   *      the actual Java class being serialized/deserialized.
+   * An implementation of SerializationContext is responsible for looking up the
+   * Serialization implementation for the given RecordReader. Potentially based
+   * on the Configuration or some other mechanism
+   * 
+   * The SerializationFactory does not give this functionality since: 1.
+   * Requires Serialization implementations to be specified in the Configuration
+   * a-priori (although same as setting a SerializationContext) 2. Does not
+   * lookup the actual subclass being deserialized. e.g., for Serializable does
+   * not have a way of configuring the actual Java class being
+   * serialized/deserialized.
    */
   static public interface SerializationContext<S> extends Configurable {
 
     /**
-     *  An {@link Serialization} object for objects of type S
+     * An {@link Serialization} object for objects of type S
+     * 
      * @return a serialization object for this context
      */
     public Serialization<S> getSerialization() throws IOException;
 
     /**
-     *  Produces the specific class to deserialize
+     * Produces the specific class to deserialize
      */
     public Class<? extends S> getRealClass() throws IOException;
   }
-  
+
   /**
    * The JobConf keys for the Serialization implementation
    */
   static public final String SerializationImplKey = "mapred.input.serialization.implKey";
 
   /**
-   *  An implementation of {@link SerializationContext} that reads the Serialization class and 
-   *  specific subclass to be deserialized from the JobConf.
-   *
+   * An implementation of {@link SerializationContext} that reads the
+   * Serialization class and specific subclass to be deserialized from the
+   * JobConf.
+   * 
    */
-  static public class SerializationContextFromConf<S> implements FlatFileInputFormat.SerializationContext<S> {
+  static public class SerializationContextFromConf<S> implements
+      FlatFileInputFormat.SerializationContext<S> {
 
     /**
      * The JobConf keys for the Class that is being deserialized.
@@ -101,57 +104,67 @@
     static public final String SerializationSubclassKey = "mapred.input.serialization.subclassKey";
 
     /**
-     * Implements configurable so it can use the configuration to find the right classes
-     * Note: ReflectionUtils will automatigically call setConf with the right configuration.
+     * Implements configurable so it can use the configuration to find the right
+     * classes Note: ReflectionUtils will automatigically call setConf with the
+     * right configuration.
      */
     private Configuration conf;
 
-    public void setConf(Configuration conf) { 
-      this.conf = conf; 
+    public void setConf(Configuration conf) {
+      this.conf = conf;
     }
 
-    public Configuration getConf() { 
-      return conf; 
+    public Configuration getConf() {
+      return conf;
     }
 
     /**
      * @return the actual class being deserialized
-     * @exception does not currently throw IOException
+     * @exception does
+     *              not currently throw IOException
      */
     public Class<S> getRealClass() throws IOException {
-      return (Class<S>)conf.getClass(SerializationSubclassKey, null, Object.class);
+      return (Class<S>) conf.getClass(SerializationSubclassKey, null,
+          Object.class);
     }
 
     /**
      * Looks up and instantiates the Serialization Object
-     *
-     * Important to note here that we are not relying on the Hadoop SerializationFactory part of the 
-     * Serialization framework. This is because in the case of Non-Writable Objects, we cannot make any
-     * assumptions about the uniformity of the serialization class APIs - i.e., there may not be a "write"
-     * method call and a subclass may need to implement its own Serialization classes. 
-     * The SerializationFactory currently returns the first (de)serializer that is compatible
-     * with the class to be deserialized;  in this context, that assumption isn't necessarily true.
-     *
+     * 
+     * Important to note here that we are not relying on the Hadoop
+     * SerializationFactory part of the Serialization framework. This is because
+     * in the case of Non-Writable Objects, we cannot make any assumptions about
+     * the uniformity of the serialization class APIs - i.e., there may not be a
+     * "write" method call and a subclass may need to implement its own
+     * Serialization classes. The SerializationFactory currently returns the
+     * first (de)serializer that is compatible with the class to be
+     * deserialized; in this context, that assumption isn't necessarily true.
+     * 
      * @return the serialization object for this context
-     * @exception does not currently throw any IOException
+     * @exception does
+     *              not currently throw any IOException
      */
     public Serialization<S> getSerialization() throws IOException {
-      Class<Serialization<S>> tClass = (Class<Serialization<S>>)conf.getClass(SerializationImplKey, null, Serialization.class);
-      return tClass == null ? null : (Serialization<S>)ReflectionUtils.newInstance(tClass, conf);
+      Class<Serialization<S>> tClass = (Class<Serialization<S>>) conf.getClass(
+          SerializationImplKey, null, Serialization.class);
+      return tClass == null ? null : (Serialization<S>) ReflectionUtils
+          .newInstance(tClass, conf);
     }
   }
 
-  /** 
-   * An {@link RecordReader} for plain files with {@link Deserializer} records 
-   *
-   * Reads one row at a time of type R.
-   * R is intended to be a base class of something such as: Record, Writable, Text, ...
-   *
+  /**
+   * An {@link RecordReader} for plain files with {@link Deserializer} records
+   * 
+   * Reads one row at a time of type R. R is intended to be a base class of
+   * something such as: Record, Writable, Text, ...
+   * 
    */
-  public class FlatFileRecordReader<R> implements RecordReader<Void, FlatFileInputFormat.RowContainer<R>> {
+  public class FlatFileRecordReader<R> implements
+      RecordReader<Void, FlatFileInputFormat.RowContainer<R>> {
 
     /**
-     *  An interface for a helper class for instantiating {@link Serialization} classes.
+     * An interface for a helper class for instantiating {@link Serialization}
+     * classes.
      */
     /**
      * The stream in use - is fsin if not compressed, otherwise, it is dcin.
@@ -179,33 +192,37 @@
     private final Deserializer<R> deserializer;
 
     /**
-     * Once EOF is reached, stop calling the deserializer 
+     * Once EOF is reached, stop calling the deserializer
      */
     private boolean isEOF;
 
     /**
-     * The JobConf which contains information needed to instantiate the correct Deserializer
+     * The JobConf which contains information needed to instantiate the correct
+     * Deserializer
      */
-    private Configuration conf;
+    private final Configuration conf;
 
     /**
-     * The actual class of the row's we are deserializing, not just the base class
+     * The actual class of the row's we are deserializing, not just the base
+     * class
      */
-    private Class<R> realRowClass;
-
+    private final Class<R> realRowClass;
 
     /**
-     * FlatFileRecordReader constructor constructs the underlying stream (potentially decompressed) and 
-     * creates the deserializer.
-     *
-     * @param conf the jobconf
-     * @param split the split for this file
+     * FlatFileRecordReader constructor constructs the underlying stream
+     * (potentially decompressed) and creates the deserializer.
+     * 
+     * @param conf
+     *          the jobconf
+     * @param split
+     *          the split for this file
      */
-    public FlatFileRecordReader(Configuration conf,
-                                FileSplit split) throws IOException {
+    public FlatFileRecordReader(Configuration conf, FileSplit split)
+        throws IOException {
       final Path path = split.getPath();
       FileSystem fileSys = path.getFileSystem(conf);
-      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
+      CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(
+          conf);
       final CompressionCodec codec = compressionCodecs.getCodec(path);
       this.conf = conf;
 
@@ -221,28 +238,26 @@
       isEOF = false;
       end = split.getLength();
 
-      // Instantiate a SerializationContext which this will use to lookup the Serialization class and the 
+      // Instantiate a SerializationContext which this will use to lookup the
+      // Serialization class and the
       // actual class being deserialized
       SerializationContext<R> sinfo;
-      Class<SerializationContext<R>> sinfoClass = 
-        (Class<SerializationContext<R>>)conf.getClass(SerializationContextImplKey, SerializationContextFromConf.class);
+      Class<SerializationContext<R>> sinfoClass = (Class<SerializationContext<R>>) conf
+          .getClass(SerializationContextImplKey,
+              SerializationContextFromConf.class);
 
-      sinfo =  (SerializationContext<R>)ReflectionUtils.newInstance(sinfoClass, conf);
+      sinfo = (SerializationContext<R>) ReflectionUtils.newInstance(sinfoClass,
+          conf);
 
       // Get the Serialization object and the class being deserialized
       Serialization<R> serialization = sinfo.getSerialization();
-      realRowClass  = (Class<R>)sinfo.getRealClass();
+      realRowClass = (Class<R>) sinfo.getRealClass();
 
-      deserializer = (Deserializer<R>)serialization.getDeserializer((Class<R>)realRowClass);
+      deserializer = serialization.getDeserializer(realRowClass);
       deserializer.open(in);
     }
 
     /**
-     * The actual class of the data being deserialized
-     */
-    private Class<R> realRowclass;
-
-    /**
      * The JobConf key of the SerializationContext to use
      */
     static public final String SerializationContextImplKey = "mapred.input.serialization.context_impl";
@@ -250,34 +265,41 @@
     /**
      * @return null
      */
-    public Void createKey() { 
+    public Void createKey() {
       return null;
     }
 
     /**
      * @return a new R instance.
      */
-    public RowContainer<R> createValue() { 
+    public RowContainer<R> createValue() {
       RowContainer<R> r = new RowContainer<R>();
-      r.row = (R)ReflectionUtils.newInstance(realRowClass, conf);
+      r.row = (R) ReflectionUtils.newInstance(realRowClass, conf);
       return r;
     }
 
     /**
      * Returns the next row # and value
-     *
-     * @param key - void as these files have a value only
-     * @param value - the row container which is always re-used, but the internal value may be set to a new Object
-     * @return whether the key and value were read. True if they were and false if EOF
-     * @exception IOException from the deserializer
-     */
-    public synchronized boolean next(Void key, RowContainer<R> value) throws IOException {
-      if(isEOF  || in.available() == 0) {
+     * 
+     * @param key
+     *          - void as these files have a value only
+     * @param value
+     *          - the row container which is always re-used, but the internal
+     *          value may be set to a new Object
+     * @return whether the key and value were read. True if they were and false
+     *         if EOF
+     * @exception IOException
+     *              from the deserializer
+     */
+    public synchronized boolean next(Void key, RowContainer<R> value)
+        throws IOException {
+      if (isEOF || in.available() == 0) {
         isEOF = true;
         return false;
       }
 
-      // the deserializer is responsible for actually reading each record from the stream
+      // the deserializer is responsible for actually reading each record from
+      // the stream
       try {
         value.row = deserializer.deserialize(value.row);
         if (value.row == null) {
@@ -285,27 +307,27 @@
           return false;
         }
         return true;
-      } catch(EOFException e) {
+      } catch (EOFException e) {
         isEOF = true;
         return false;
       }
     }
 
     public synchronized float getProgress() throws IOException {
-      // this assumes no splitting                                                                                               
+      // this assumes no splitting
       if (end == 0) {
         return 0.0f;
       } else {
-        // gives progress over uncompressed stream                                                                               
+        // gives progress over uncompressed stream
         // assumes deserializer is not buffering itself
-        return Math.min(1.0f, fsin.getPos()/(float)(end));
+        return Math.min(1.0f, fsin.getPos() / (float) (end));
       }
     }
 
     public synchronized long getPos() throws IOException {
       // assumes deserializer is not buffering itself
-      // position over uncompressed stream. not sure what                                                                        
-      // effect this has on stats about job                                                                                      
+      // position over uncompressed stream. not sure what
+      // effect this has on stats about job
       return fsin.getPos();
     }
 
@@ -319,9 +341,9 @@
     return false;
   }
 
+  @Override
   public RecordReader<Void, RowContainer<T>> getRecordReader(InputSplit split,
-                                                             JobConf job, Reporter reporter)
-    throws IOException {
+      JobConf job, Reporter reporter) throws IOException {
 
     reporter.setStatus(split.toString());
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Thu Jan 21 10:37:58 2010
@@ -25,7 +25,6 @@
 import java.util.Properties;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,14 +44,12 @@
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * An util class for various Hive file format tasks.
- * registerOutputFormatSubstitute(Class, Class) 
- * getOutputFormatSubstitute(Class) are added for backward 
- * compatibility. They return the newly added HiveOutputFormat for the older 
- * ones.
+ * registerOutputFormatSubstitute(Class, Class) getOutputFormatSubstitute(Class)
+ * are added for backward compatibility. They return the newly added
+ * HiveOutputFormat for the older ones.
  * 
  */
 public class HiveFileFormatUtils {
@@ -67,7 +64,7 @@
 
   @SuppressWarnings("unchecked")
   private static Map<Class<? extends OutputFormat>, Class<? extends HiveOutputFormat>> outputFormatSubstituteMap;
-  
+
   /**
    * register a substitute
    * 
@@ -88,8 +85,9 @@
   @SuppressWarnings("unchecked")
   public synchronized static Class<? extends HiveOutputFormat> getOutputFormatSubstitute(
       Class<?> origin) {
-    if (HiveOutputFormat.class.isAssignableFrom(origin))
+    if (HiveOutputFormat.class.isAssignableFrom(origin)) {
       return (Class<? extends HiveOutputFormat>) origin;
+    }
     Class<? extends HiveOutputFormat> result = outputFormatSubstituteMap
         .get(origin);
     return result;
@@ -112,11 +110,13 @@
     }
     return defaultFinalPath;
   }
-  
+
   static {
     inputFormatCheckerMap = new HashMap<Class<? extends InputFormat>, Class<? extends InputFormatChecker>>();
-    HiveFileFormatUtils.registerInputFormatChecker(SequenceFileInputFormat.class, SequenceFileInputFormatChecker.class);
-    HiveFileFormatUtils.registerInputFormatChecker(RCFileInputFormat.class, RCFileInputFormat.class);
+    HiveFileFormatUtils.registerInputFormatChecker(
+        SequenceFileInputFormat.class, SequenceFileInputFormatChecker.class);
+    HiveFileFormatUtils.registerInputFormatChecker(RCFileInputFormat.class,
+        RCFileInputFormat.class);
     inputFormatCheckerInstanceCache = new HashMap<Class<? extends InputFormatChecker>, InputFormatChecker>();
   }
 
@@ -139,12 +139,13 @@
     inputFormatCheckerMap.put(format, checker);
   }
 
-	/**
+  /**
    * get an InputFormatChecker for a file format.
    */
   public synchronized static Class<? extends InputFormatChecker> getInputFormatChecker(
       Class<?> inputFormat) {
-    Class<? extends InputFormatChecker> result = inputFormatCheckerMap.get(inputFormat);
+    Class<? extends InputFormatChecker> result = inputFormatCheckerMap
+        .get(inputFormat);
     return result;
   }
 
@@ -157,14 +158,15 @@
       throws HiveException {
     if (files.size() > 0) {
       Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls);
-      if(checkerCls==null && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
+      if (checkerCls == null
+          && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
         // we get a text input format here, we can not determine a file is text
         // according to its content, so we can do is to test if other file
         // format can accept it. If one other file format can accept this file,
         // we treat this file as text file, although it maybe not.
-       return checkTextInputFormat(fs, conf, files);
+        return checkTextInputFormat(fs, conf, files);
       }
-      
+
       if (checkerCls != null) {
         InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache
             .get(checkerCls);
@@ -190,25 +192,27 @@
         .keySet();
     for (Class<? extends InputFormat> reg : inputFormatter) {
       boolean result = checkInputFormat(fs, conf, reg, files);
-      if (result)
+      if (result) {
         return false;
+      }
     }
     return true;
   }
-  
-  
+
   public static RecordWriter getHiveRecordWriter(JobConf jc,
       tableDesc tableInfo, Class<? extends Writable> outputClass,
       fileSinkDesc conf, Path outPath) throws HiveException {
     try {
-      HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
+      HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo
+          .getOutputFileFormatClass().newInstance();
       boolean isCompressed = conf.getCompressed();
       JobConf jc_output = jc;
       if (isCompressed) {
         jc_output = new JobConf(jc);
         String codecStr = conf.getCompressCodec();
         if (codecStr != null && !codecStr.trim().equals("")) {
-          Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
+          Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class
+              .forName(codecStr);
           FileOutputFormat.setOutputCompressorClass(jc_output, codec);
         }
         String type = conf.getCompressType();
@@ -234,5 +238,5 @@
     }
     return null;
   }
-  
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -77,8 +77,8 @@
 
     final int finalRowSeparator = rowSeparator;
     FileSystem fs = outPath.getFileSystem(jc);
-    final OutputStream outStream = Utilities.createCompressedStream(jc,
-        fs.create(outPath), isCompressed);
+    final OutputStream outStream = Utilities.createCompressedStream(jc, fs
+        .create(outPath), isCompressed);
     return new RecordWriter() {
       public void write(Writable r) throws IOException {
         if (r instanceof Text) {
@@ -102,7 +102,7 @@
   protected static class IgnoreKeyWriter<K extends WritableComparable, V extends Writable>
       implements org.apache.hadoop.mapred.RecordWriter<K, V> {
 
-    private org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
+    private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
 
     public IgnoreKeyWriter(org.apache.hadoop.mapred.RecordWriter<K, V> writer) {
       this.mWriter = writer;
@@ -117,6 +117,7 @@
     }
   }
 
+  @Override
   public org.apache.hadoop.mapred.RecordWriter<K, V> getRecordWriter(
       FileSystem ignored, JobConf job, String name, Progressable progress)
       throws IOException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Jan 21 10:37:58 2010
@@ -22,9 +22,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -39,52 +37,52 @@
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-
 /**
- * HiveInputFormat is a parameterized InputFormat which looks at the path name and determine
- * the correct InputFormat for that path name from mapredPlan.pathToPartitionInfo().
- * It can be used to read files with different input format in the same map-reduce job.
+ * HiveInputFormat is a parameterized InputFormat which looks at the path name
+ * and determine the correct InputFormat for that path name from
+ * mapredPlan.pathToPartitionInfo(). It can be used to read files with different
+ * input format in the same map-reduce job.
  */
-public class HiveInputFormat<K extends WritableComparable,
-                             V extends Writable> implements InputFormat<K, V>, JobConfigurable {
+public class HiveInputFormat<K extends WritableComparable, V extends Writable>
+    implements InputFormat<K, V>, JobConfigurable {
 
-  public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.hive.ql.io.HiveInputFormat");
+  public static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.hive.ql.io.HiveInputFormat");
 
   /**
-   * HiveInputSplit encapsulates an InputSplit with its corresponding inputFormatClass.
-   * The reason that it derives from FileSplit is to make sure "map.input.file" in MapTask.
+   * HiveInputSplit encapsulates an InputSplit with its corresponding
+   * inputFormatClass. The reason that it derives from FileSplit is to make sure
+   * "map.input.file" in MapTask.
    */
-  public static class HiveInputSplit extends FileSplit implements InputSplit, Configurable {
+  public static class HiveInputSplit extends FileSplit implements InputSplit,
+      Configurable {
 
     InputSplit inputSplit;
-    String     inputFormatClassName;
+    String inputFormatClassName;
 
     public HiveInputSplit() {
       // This is the only public constructor of FileSplit
-      super((Path)null, 0, 0, (String[])null);
+      super((Path) null, 0, 0, (String[]) null);
     }
 
     public HiveInputSplit(InputSplit inputSplit, String inputFormatClassName) {
       // This is the only public constructor of FileSplit
-      super((Path)null, 0, 0, (String[])null);
+      super((Path) null, 0, 0, (String[]) null);
       this.inputSplit = inputSplit;
       this.inputFormatClassName = inputFormatClassName;
     }
@@ -92,29 +90,34 @@
     public InputSplit getInputSplit() {
       return inputSplit;
     }
+
     public String inputFormatClassName() {
       return inputFormatClassName;
     }
 
+    @Override
     public Path getPath() {
       if (inputSplit instanceof FileSplit) {
-        return ((FileSplit)inputSplit).getPath();
+        return ((FileSplit) inputSplit).getPath();
       }
       return new Path("");
     }
 
     /** The position of the first byte in the file to process. */
+    @Override
     public long getStart() {
       if (inputSplit instanceof FileSplit) {
-        return ((FileSplit)inputSplit).getStart();
+        return ((FileSplit) inputSplit).getStart();
       }
       return 0;
     }
 
+    @Override
     public String toString() {
       return inputFormatClassName + ":" + inputSplit.toString();
     }
 
+    @Override
     public long getLength() {
       long r = 0;
       try {
@@ -125,23 +128,27 @@
       return r;
     }
 
+    @Override
     public String[] getLocations() throws IOException {
       return inputSplit.getLocations();
     }
 
+    @Override
     public void readFields(DataInput in) throws IOException {
       String inputSplitClassName = in.readUTF();
       try {
-        inputSplit = (InputSplit) ReflectionUtils.newInstance(
-            conf.getClassByName(inputSplitClassName), conf);
+        inputSplit = (InputSplit) ReflectionUtils.newInstance(conf
+            .getClassByName(inputSplitClassName), conf);
       } catch (Exception e) {
-        throw new IOException("Cannot create an instance of InputSplit class = "
-            + inputSplitClassName + ":" + e.getMessage());
+        throw new IOException(
+            "Cannot create an instance of InputSplit class = "
+                + inputSplitClassName + ":" + e.getMessage());
       }
       inputSplit.readFields(in);
       inputFormatClassName = in.readUTF();
     }
 
+    @Override
     public void write(DataOutput out) throws IOException {
       out.writeUTF(inputSplit.getClass().getName());
       inputSplit.write(out);
@@ -149,7 +156,7 @@
     }
 
     Configuration conf;
-    
+
     @Override
     public Configuration getConf() {
       return conf;
@@ -170,19 +177,21 @@
   /**
    * A cache of InputFormat instances.
    */
-  private static Map<Class,InputFormat<WritableComparable, Writable>> inputFormats;
-  static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass, JobConf job) throws IOException {
+  private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats;
+
+  static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
+      Class inputFormatClass, JobConf job) throws IOException {
     if (inputFormats == null) {
       inputFormats = new HashMap<Class, InputFormat<WritableComparable, Writable>>();
     }
     if (!inputFormats.containsKey(inputFormatClass)) {
       try {
-        InputFormat<WritableComparable, Writable> newInstance =
-            (InputFormat<WritableComparable, Writable>)ReflectionUtils.newInstance(inputFormatClass, job);
+        InputFormat<WritableComparable, Writable> newInstance = (InputFormat<WritableComparable, Writable>) ReflectionUtils
+            .newInstance(inputFormatClass, job);
         inputFormats.put(inputFormatClass, newInstance);
       } catch (Exception e) {
-        throw new IOException("Cannot create an instance of InputFormat class " + inputFormatClass.getName()
-            + " as specified in mapredWork!");
+        throw new IOException("Cannot create an instance of InputFormat class "
+            + inputFormatClass.getName() + " as specified in mapredWork!");
       }
     }
     return inputFormats.get(inputFormatClass);
@@ -191,7 +200,7 @@
   public RecordReader getRecordReader(InputSplit split, JobConf job,
       Reporter reporter) throws IOException {
 
-    HiveInputSplit hsplit = (HiveInputSplit)split;
+    HiveInputSplit hsplit = (HiveInputSplit) split;
 
     InputSplit inputSplit = hsplit.getInputSplit();
     String inputFormatClassName = null;
@@ -203,14 +212,15 @@
       throw new IOException("cannot find class " + inputFormatClassName);
     }
 
-    //clone a jobConf for setting needed columns for reading
+    // clone a jobConf for setting needed columns for reading
     JobConf cloneJobConf = new JobConf(job);
-    initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath().toString(), 
-                      hsplit.getPath().toUri().getPath());
+    initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+        .toString(), hsplit.getPath().toUri().getPath());
 
-    InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, cloneJobConf);
+    InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
+        cloneJobConf);
     return new HiveRecordReader(inputFormat.getRecordReader(inputSplit,
-    		cloneJobConf, reporter));
+        cloneJobConf, reporter));
   }
 
   private Map<String, partitionDesc> pathToPartitionInfo;
@@ -233,16 +243,17 @@
     ArrayList<InputSplit> result = new ArrayList<InputSplit>();
 
     // for each dir, get the InputFormat, and do getSplits.
-    for(Path dir: dirs) {
-    	partitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
-      // create a new InputFormat instance if this is the first time to see this class
+    for (Path dir : dirs) {
+      partitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
+      // create a new InputFormat instance if this is the first time to see this
+      // class
       Class inputFormatClass = part.getInputFileFormatClass();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
 
       FileInputFormat.setInputPaths(newjob, dir);
       newjob.setInputFormat(inputFormat.getClass());
-      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits/dirs.length);
-      for(InputSplit is: iss) {
+      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
+      for (InputSplit is : iss) {
         result.add(new HiveInputSplit(is, inputFormatClass.getName()));
       }
     }
@@ -260,10 +271,12 @@
     JobConf newjob = new JobConf(job);
 
     // for each dir, get the InputFormat, and do validateInput.
-    for (Path dir: dirs) {
+    for (Path dir : dirs) {
       partitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
-      // create a new InputFormat instance if this is the first time to see this class
-      InputFormat inputFormat = getInputFormatFromCache(part.getInputFileFormatClass(), job);
+      // create a new InputFormat instance if this is the first time to see this
+      // class
+      InputFormat inputFormat = getInputFormatFromCache(part
+          .getInputFileFormatClass(), job);
 
       FileInputFormat.setInputPaths(newjob, dir);
       newjob.setInputFormat(inputFormat.getClass());
@@ -271,47 +284,53 @@
     }
   }
 
-	protected static partitionDesc getPartitionDescFromPath(Map<String, partitionDesc> pathToPartitionInfo,
-                                                  Path dir) throws IOException {
+  protected static partitionDesc getPartitionDescFromPath(
+      Map<String, partitionDesc> pathToPartitionInfo, Path dir)
+      throws IOException {
     partitionDesc partDesc = pathToPartitionInfo.get(dir.toString());
     if (partDesc == null) {
       partDesc = pathToPartitionInfo.get(dir.toUri().getPath());
     }
     if (partDesc == null) {
-      throw new IOException("cannot find dir = " + dir.toString() + " in partToPartitionInfo!");
+      throw new IOException("cannot find dir = " + dir.toString()
+          + " in partToPartitionInfo!");
     }
 
     return partDesc;
   }
 
   protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass,
-                                   String splitPath, String splitPathWithNoSchema) {
-    if (this.mrwork == null)
+      String splitPath, String splitPathWithNoSchema) {
+    if (this.mrwork == null) {
       init(job);
+    }
 
     ArrayList<String> aliases = new ArrayList<String>();
-    Iterator<Entry<String, ArrayList<String>>> iterator = 
-      this.mrwork.getPathToAliases().entrySet().iterator();
+    Iterator<Entry<String, ArrayList<String>>> iterator = this.mrwork
+        .getPathToAliases().entrySet().iterator();
 
     while (iterator.hasNext()) {
       Entry<String, ArrayList<String>> entry = iterator.next();
       String key = entry.getKey();
       if (splitPath.startsWith(key) || splitPathWithNoSchema.startsWith(key)) {
         ArrayList<String> list = entry.getValue();
-        for (String val : list)
+        for (String val : list) {
           aliases.add(val);
+        }
       }
     }
 
     for (String alias : aliases) {
-      Operator<? extends Serializable> op = this.mrwork.getAliasToWork().get(alias);
+      Operator<? extends Serializable> op = this.mrwork.getAliasToWork().get(
+          alias);
       if (op instanceof TableScanOperator) {
         TableScanOperator tableScan = (TableScanOperator) op;
         ArrayList<Integer> list = tableScan.getNeededColumnIDs();
-        if (list != null)
-        	ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
-        else
-        	ColumnProjectionUtils.setFullyReadColumns(jobConf);
+        if (list != null) {
+          ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
+        } else {
+          ColumnProjectionUtils.setFullyReadColumns(jobConf);
+        }
       }
     }
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Thu Jan 21 10:37:58 2010
@@ -21,46 +21,52 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableComparator;
 
-/** HiveKey is a simple wrapper on Text which allows us to set the hashCode easily.
- *  hashCode is used for hadoop partitioner.
- */ 
+/**
+ * HiveKey is a simple wrapper on Text which allows us to set the hashCode
+ * easily. hashCode is used for hadoop partitioner.
+ */
 public class HiveKey extends BytesWritable {
 
   private static final int LENGTH_BYTES = 4;
-  
+
   boolean hashCodeValid;
+
   public HiveKey() {
-    hashCodeValid = false; 
+    hashCodeValid = false;
   }
-  
-  protected int myHashCode; 
+
+  protected int myHashCode;
+
   public void setHashCode(int myHashCode) {
-    this.hashCodeValid = true;
+    hashCodeValid = true;
     this.myHashCode = myHashCode;
   }
+
+  @Override
   public int hashCode() {
     if (!hashCodeValid) {
-      throw new RuntimeException("Cannot get hashCode() from deserialized " + HiveKey.class);
+      throw new RuntimeException("Cannot get hashCode() from deserialized "
+          + HiveKey.class);
     }
     return myHashCode;
   }
 
-  /** A Comparator optimized for HiveKey. */ 
+  /** A Comparator optimized for HiveKey. */
   public static class Comparator extends WritableComparator {
     public Comparator() {
       super(HiveKey.class);
     }
-    
+
     /**
      * Compare the buffers in serialized form.
      */
-    public int compare(byte[] b1, int s1, int l1,
-                       byte[] b2, int s2, int l2) {
-      return compareBytes(b1, s1+LENGTH_BYTES, l1-LENGTH_BYTES, 
-                          b2, s2+LENGTH_BYTES, l2-LENGTH_BYTES);
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2
+          + LENGTH_BYTES, l2 - LENGTH_BYTES);
     }
   }
-  
+
   static {
     WritableComparator.define(HiveKey.class, new Comparator());
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -30,9 +30,10 @@
 
 /**
  * <code>HiveOutputFormat</code> describes the output-specification for Hive's
- * operators. It has a method {@link #getHiveRecordWriter(JobConf, Path, Class, 
- * boolean, Properties, Progressable)}, with various parameters used to create 
- * the final out file and get some specific settings.
+ * operators. It has a method
+ * {@link #getHiveRecordWriter(JobConf, Path, Class, boolean, Properties, Progressable)}
+ * , with various parameters used to create the final out file and get some
+ * specific settings.
  * 
  * @see org.apache.hadoop.mapred.OutputFormat
  * @see RecordWriter

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java Thu Jan 21 10:37:58 2010
@@ -18,44 +18,46 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hive.ql.exec.ExecMapper;
-import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import java.io.IOException;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class HiveRecordReader<K extends WritableComparable, V extends Writable>
+    implements RecordReader<K, V> {
 
-public class HiveRecordReader<K extends WritableComparable, V extends Writable>  
-  implements RecordReader<K, V> {
+  private final RecordReader recordReader;
 
-  private RecordReader recordReader;
-  public HiveRecordReader(RecordReader recordReader){ 
+  public HiveRecordReader(RecordReader recordReader) {
     this.recordReader = recordReader;
   }
 
-  public void close() throws IOException { 
-    recordReader.close(); 
+  public void close() throws IOException {
+    recordReader.close();
   }
 
-  public K createKey() { 
-    return (K)recordReader.createKey();
+  public K createKey() {
+    return (K) recordReader.createKey();
   }
 
-  public V createValue() { 
-    return (V)recordReader.createValue();
+  public V createValue() {
+    return (V) recordReader.createValue();
   }
 
-  public long getPos() throws IOException { 
+  public long getPos() throws IOException {
     return recordReader.getPos();
   }
 
-  public float getProgress() throws IOException { 
+  public float getProgress() throws IOException {
     return recordReader.getProgress();
   }
-  
-  public boolean  next(K key, V value) throws IOException { 
-    if (ExecMapper.getDone())
+
+  public boolean next(K key, V value) throws IOException {
+    if (ExecMapper.getDone()) {
       return false;
+    }
     return recordReader.next(key, value);
   }
 }
-

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -34,9 +34,8 @@
 import org.apache.hadoop.util.Progressable;
 
 /** A {@link HiveOutputFormat} that writes {@link SequenceFile}s. */
-public class HiveSequenceFileOutputFormat extends
-    SequenceFileOutputFormat implements
-    HiveOutputFormat<WritableComparable, Writable> {
+public class HiveSequenceFileOutputFormat extends SequenceFileOutputFormat
+    implements HiveOutputFormat<WritableComparable, Writable> {
 
   BytesWritable EMPTY_KEY = new BytesWritable();
 
@@ -64,8 +63,7 @@
 
     FileSystem fs = finalOutPath.getFileSystem(jc);
     final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
-        fs, finalOutPath, BytesWritable.class, valueClass,
-        isCompressed);
+        fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
 
     return new RecordWriter() {
       public void write(Writable r) throws IOException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -30,42 +30,39 @@
 import org.apache.hadoop.util.Progressable;
 
 /**
- * This class replaces key with null before feeding the <key, value> 
- * to TextOutputFormat.RecordWriter.
+ * This class replaces key with null before feeding the <key, value> to
+ * TextOutputFormat.RecordWriter.
  * 
  * @deprecated use {@link HiveIgnoreKeyTextOutputFormat} instead}
  */
-public class IgnoreKeyTextOutputFormat<K extends WritableComparable, 
-                                       V extends Writable> 
-  extends TextOutputFormat<K, V> {
-
-  protected static class IgnoreKeyWriter<K extends WritableComparable, 
-                                          V extends Writable>
-    implements RecordWriter<K, V> {
-    
-    private RecordWriter<K, V> mWriter; 
-    
+@Deprecated
+public class IgnoreKeyTextOutputFormat<K extends WritableComparable, V extends Writable>
+    extends TextOutputFormat<K, V> {
+
+  protected static class IgnoreKeyWriter<K extends WritableComparable, V extends Writable>
+      implements RecordWriter<K, V> {
+
+    private final RecordWriter<K, V> mWriter;
+
     public IgnoreKeyWriter(RecordWriter<K, V> writer) {
       this.mWriter = writer;
     }
-    
+
     public synchronized void write(K key, V value) throws IOException {
-      this.mWriter.write(null, value);      
+      this.mWriter.write(null, value);
     }
 
     public void close(Reporter reporter) throws IOException {
       this.mWriter.close(reporter);
     }
   }
-  
-  public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
-                                            JobConf job,
-                                            String name,
-                                            Progressable progress)
-    throws IOException {
-    
-    return new IgnoreKeyWriter<K, V>(super.getRecordWriter(ignored, job, name, progress));
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
+      String name, Progressable progress) throws IOException {
+
+    return new IgnoreKeyWriter<K, V>(super.getRecordWriter(ignored, job, name,
+        progress));
   }
 
-      
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java Thu Jan 21 10:37:58 2010
@@ -30,10 +30,11 @@
  */
 public interface InputFormatChecker {
 
-	/**
-	 * This method is used to validate the input files
-	 * 
-	 */
-	public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws IOException;
+  /**
+   * This method is used to validate the input files
+   * 
+   */
+  public boolean validateInput(FileSystem fs, HiveConf conf,
+      ArrayList<FileStatus> files) throws IOException;
 
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java Thu Jan 21 10:37:58 2010
@@ -31,9 +31,10 @@
  * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
  * synchronized modifiers.
  */
-public class NonSyncDataInputBuffer extends FilterInputStream implements DataInput {
+public class NonSyncDataInputBuffer extends FilterInputStream implements
+    DataInput {
 
-  private NonSyncByteArrayInputStream buffer;
+  private final NonSyncByteArrayInputStream buffer;
 
   byte[] buff = new byte[16];
 
@@ -77,7 +78,7 @@
    * 
    * @throws IOException
    *           If a problem occurs reading from this DataInputStream.
-   *
+   * 
    */
   @Override
   public final int read(byte[] buffer) throws IOException {
@@ -156,8 +157,9 @@
 
     while (offset < count) {
       int bytesRead = in.read(buff, offset, count - offset);
-      if (bytesRead == -1)
+      if (bytesRead == -1) {
         return bytesRead;
+      }
       offset += bytesRead;
     }
     return offset;
@@ -464,22 +466,26 @@
       int utfSize) throws UTFDataFormatException {
     int count = 0, s = 0, a;
     while (count < utfSize) {
-      if ((out[s] = (char) buf[offset + count++]) < '\u0080')
+      if ((out[s] = (char) buf[offset + count++]) < '\u0080') {
         s++;
-      else if (((a = out[s]) & 0xe0) == 0xc0) {
-        if (count >= utfSize)
+      } else if (((a = out[s]) & 0xe0) == 0xc0) {
+        if (count >= utfSize) {
           throw new UTFDataFormatException();
+        }
         int b = buf[count++];
-        if ((b & 0xC0) != 0x80)
+        if ((b & 0xC0) != 0x80) {
           throw new UTFDataFormatException();
+        }
         out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
       } else if ((a & 0xf0) == 0xe0) {
-        if (count + 1 >= utfSize)
+        if (count + 1 >= utfSize) {
           throw new UTFDataFormatException();
+        }
         int b = buf[count++];
         int c = buf[count++];
-        if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80))
+        if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
           throw new UTFDataFormatException();
+        }
         out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F));
       } else {
         throw new UTFDataFormatException();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java Thu Jan 21 10:37:58 2010
@@ -29,7 +29,7 @@
  */
 public class NonSyncDataOutputBuffer extends DataOutputStream {
 
-  private NonSyncByteArrayOutputStream buffer;
+  private final NonSyncByteArrayOutputStream buffer;
 
   /** Constructs a new empty buffer. */
   public NonSyncDataOutputBuffer() {
@@ -56,7 +56,7 @@
 
   /** Resets the buffer to empty. */
   public NonSyncDataOutputBuffer reset() {
-    this.written = 0;
+    written = 0;
     buffer.reset();
     return this;
   }
@@ -66,11 +66,13 @@
     buffer.write(in, length);
   }
 
+  @Override
   public void write(int b) throws IOException {
     buffer.write(b);
     incCount(1);
   }
 
+  @Override
   public void write(byte b[], int off, int len) throws IOException {
     buffer.write(b, off, len);
     incCount(len);
@@ -79,7 +81,8 @@
   private void incCount(int value) {
     if (written + value < 0) {
       written = Integer.MAX_VALUE;
-    } else
+    } else {
       written += value;
+    }
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Thu Jan 21 10:37:58 2010
@@ -37,7 +37,6 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.CodecPool;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
@@ -221,7 +220,8 @@
      * @param colValLenBuffer
      *          each cell's length of this column's in this split
      */
-    void setColumnLenInfo(int columnValueLen, NonSyncDataOutputBuffer colValLenBuffer,
+    void setColumnLenInfo(int columnValueLen,
+        NonSyncDataOutputBuffer colValLenBuffer,
         int columnUncompressedValueLen, int columnIndex) {
       eachColumnValueLen[columnIndex] = columnValueLen;
       eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
@@ -239,10 +239,11 @@
         eachColumnValueLen[i] = WritableUtils.readVInt(in);
         eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
         int bufLen = WritableUtils.readVInt(in);
-        if (allCellValLenBuffer[i] == null)
+        if (allCellValLenBuffer[i] == null) {
           allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
-        else
+        } else {
           allCellValLenBuffer[i].reset();
+        }
         allCellValLenBuffer[i].write(in, bufLen);
       }
     }
@@ -292,12 +293,12 @@
    * </ul>
    */
   static class ValueBuffer implements Writable {
-    
+
     class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
-      
+
       int index = -1;
       int colIndex = -1;
-      
+
       public LazyDecompressionCallbackImpl(int index, int colIndex) {
         super();
         this.index = index;
@@ -306,27 +307,30 @@
 
       @Override
       public byte[] decompress() throws IOException {
-        
-        if (decompressedFlag[index] || codec == null)
+
+        if (decompressedFlag[index] || codec == null) {
           return loadedColumnsValueBuffer[index].getData();
-        
+        }
+
         NonSyncDataOutputBuffer compressedData = loadedColumnsValueBuffer[index];
         NonSyncDataOutputBuffer decompressedData = new NonSyncDataOutputBuffer();
         decompressBuffer.reset();
         DataInputStream valueIn = new DataInputStream(deflatFilter);
         deflatFilter.resetState();
-        decompressBuffer.reset(compressedData.getData(), keyBuffer.eachColumnValueLen[colIndex]);
-        decompressedData.write(valueIn, keyBuffer.eachColumnUncompressedValueLen[colIndex]);
+        decompressBuffer.reset(compressedData.getData(),
+            keyBuffer.eachColumnValueLen[colIndex]);
+        decompressedData.write(valueIn,
+            keyBuffer.eachColumnUncompressedValueLen[colIndex]);
         loadedColumnsValueBuffer[index] = decompressedData;
         decompressedFlag[index] = true;
         return decompressedData.getData();
       }
     }
-    
+
     // used to load columns' value into memory
     private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
     private boolean[] decompressedFlag = null;
-    private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null; 
+    private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
 
     boolean inited = false;
 
@@ -362,19 +366,24 @@
         skippedColIDs = skippedCols;
       } else {
         skippedColIDs = new boolean[columnNumber];
-        for (int i = 0; i < skippedColIDs.length; i++)
+        for (int i = 0; i < skippedColIDs.length; i++) {
           skippedColIDs[i] = false;
+        }
       }
 
       int skipped = 0;
       if (skippedColIDs != null) {
-        for (boolean currentSkip : skippedColIDs)
-          if (currentSkip)
+        for (boolean currentSkip : skippedColIDs) {
+          if (currentSkip) {
             skipped++;
+          }
+        }
       }
-      loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber - skipped];
+      loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+          - skipped];
       decompressedFlag = new boolean[columnNumber - skipped];
-      lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber - skipped];
+      lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber
+          - skipped];
       this.codec = codec;
       if (codec != null) {
         valDecompressor = CodecPool.getDecompressor(codec);
@@ -383,12 +392,14 @@
       }
 
       for (int k = 0, readIndex = 0; k < columnNumber; k++) {
-        if (skippedColIDs[k])
+        if (skippedColIDs[k]) {
           continue;
+        }
         loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
-        if(codec != null) {
+        if (codec != null) {
           decompressedFlag[readIndex] = false;
-          lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(readIndex, k);
+          lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(
+              readIndex, k);
         } else {
           decompressedFlag[readIndex] = true;
         }
@@ -396,7 +407,8 @@
       }
     }
 
-    public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer, int addIndex) {
+    public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
+        int addIndex) {
       loadedColumnsValueBuffer[addIndex] = valBuffer;
     }
 
@@ -420,8 +432,9 @@
         NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
         valBuf.reset();
         valBuf.write(in, vaRowsLen);
-        if(codec != null)
+        if (codec != null) {
           decompressedFlag[addIndex] = false;
+        }
         addIndex++;
       }
 
@@ -432,8 +445,7 @@
 
     @Override
     public void write(DataOutput out) throws IOException {
-      for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
-        NonSyncDataOutputBuffer currentBuf = loadedColumnsValueBuffer[i];
+      for (NonSyncDataOutputBuffer currentBuf : loadedColumnsValueBuffer) {
         out.write(currentBuf.getData(), 0, currentBuf.getLength());
       }
     }
@@ -443,8 +455,8 @@
     }
 
     public void close() {
-      for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
-        IOUtils.closeStream(loadedColumnsValueBuffer[i]);
+      for (NonSyncDataOutputBuffer element : loadedColumnsValueBuffer) {
+        IOUtils.closeStream(element);
       }
       if (codec != null) {
         IOUtils.closeStream(decompressBuffer);
@@ -496,7 +508,7 @@
     NonSyncDataOutputBuffer[] compressionBuffer;
     CompressionOutputStream[] deflateFilter = null;
     DataOutputStream[] deflateOut = null;
-    private ColumnBuffer[] columnBuffers;
+    private final ColumnBuffer[] columnBuffers;
 
     NonSyncDataOutputBuffer keyCompressionBuffer;
     CompressionOutputStream keyDeflateFilter;
@@ -505,7 +517,7 @@
 
     private int columnNumber = 0;
 
-    private int[] columnValuePlainLength;
+    private final int[] columnValuePlainLength;
 
     KeyBuffer key = null;
     ValueBuffer value = null;
@@ -528,7 +540,7 @@
        */
       int runLength = 0;
       int prevValueLength = -1;
-      
+
       ColumnBuffer() throws IOException {
         columnValBuffer = new NonSyncDataOutputBuffer();
         valLenBuffer = new NonSyncDataOutputBuffer();
@@ -537,17 +549,17 @@
       public void append(BytesRefWritable data) throws IOException {
         data.writeDataTo(columnValBuffer);
         int currentLen = data.getLength();
-        
-        if( prevValueLength < 0) {
+
+        if (prevValueLength < 0) {
           startNewGroup(currentLen);
           return;
         }
-        
-        if(currentLen != prevValueLength) {
+
+        if (currentLen != prevValueLength) {
           flushGroup();
           startNewGroup(currentLen);
         } else {
-          runLength ++;
+          runLength++;
         }
       }
 
@@ -563,12 +575,13 @@
         prevValueLength = -1;
         runLength = 0;
       }
-      
+
       public void flushGroup() throws IOException {
         if (prevValueLength >= 0) {
           WritableUtils.writeVLong(valLenBuffer, prevValueLength);
-          if (runLength > 0)
+          if (runLength > 0) {
             WritableUtils.writeVLong(valLenBuffer, ~runLength);
+          }
           runLength = -1;
           prevValueLength = -1;
         }
@@ -645,8 +658,9 @@
       RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
       columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
 
-      if (metadata == null)
+      if (metadata == null) {
         metadata = new Metadata();
+      }
       metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
           + columnNumber));
 
@@ -766,9 +780,10 @@
      */
     public void append(Writable val) throws IOException {
 
-      if (!(val instanceof BytesRefArrayWritable))
+      if (!(val instanceof BytesRefArrayWritable)) {
         throw new UnsupportedOperationException(
             "Currently the writer can only accept BytesRefArrayWritable");
+      }
 
       BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
       int size = columns.size();
@@ -802,7 +817,7 @@
       for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
         ColumnBuffer currentBuf = columnBuffers[columnIndex];
         currentBuf.flushGroup();
-        
+
         NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
 
         if (isCompressed()) {
@@ -829,8 +844,9 @@
       }
 
       int keyLength = key.getSize();
-      if (keyLength < 0)
+      if (keyLength < 0) {
         throw new IOException("negative length keys not allowed: " + key);
+      }
 
       // Write the record out
       checkAndWriteSync(); // sync
@@ -865,8 +881,9 @@
     }
 
     public synchronized void close() throws IOException {
-      if (bufferedRecords > 0)
+      if (bufferedRecords > 0) {
         flushRecords();
+      }
       clearColumnBuffers();
 
       if (isCompressed()) {
@@ -898,25 +915,25 @@
    */
   public static class Reader {
 
-    private Path file;
-    private FSDataInputStream in;
+    private final Path file;
+    private final FSDataInputStream in;
 
     private byte version;
 
     private CompressionCodec codec = null;
     private Metadata metadata = null;
 
-    private byte[] sync = new byte[SYNC_HASH_SIZE];
-    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+    private final byte[] sync = new byte[SYNC_HASH_SIZE];
+    private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
     private boolean syncSeen;
 
-    private long end;
+    private final long end;
     private int currentKeyLength;
     private int currentRecordLength;
 
-    private Configuration conf;
+    private final Configuration conf;
 
-    private ValueBuffer currentValue;
+    private final ValueBuffer currentValue;
 
     private boolean[] skippedColIDs = null;
 
@@ -931,14 +948,14 @@
     private int passedRowsNum = 0;
 
     private int[] columnRowReadIndex = null;
-    private NonSyncDataInputBuffer[] colValLenBufferReadIn;
-    private int[] columnRunLength;
-    private int[] columnPrvLength;
+    private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
+    private final int[] columnRunLength;
+    private final int[] columnPrvLength;
     private boolean decompress = false;
 
     private Decompressor keyDecompressor;
     NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
-    
+
     int[] prjColIDs = null; // selected column IDs
 
     /** Create a new RCFile reader. */
@@ -966,15 +983,17 @@
       columnNumber = Integer.parseInt(metadata.get(
           new Text(COLUMN_NUMBER_METADATA_STR)).toString());
 
-      java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
+      java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
+          .getReadColumnIDs(conf);
       skippedColIDs = new boolean[columnNumber];
       if (notSkipIDs.size() > 0) {
         for (int i = 0; i < skippedColIDs.length; i++) {
           skippedColIDs[i] = true;
         }
         for (int read : notSkipIDs) {
-          if (read < columnNumber)
+          if (read < columnNumber) {
             skippedColIDs[read] = false;
+          }
         }
       } else {
         // TODO: if no column name is specified e.g, in select count(1) from tt;
@@ -987,15 +1006,16 @@
 
       loadColumnNum = columnNumber;
       if (skippedColIDs != null && skippedColIDs.length > 0) {
-        for (int i = 0; i < skippedColIDs.length; i++) {
-          if (skippedColIDs[i])
+        for (boolean skippedColID : skippedColIDs) {
+          if (skippedColID) {
             loadColumnNum -= 1;
+          }
         }
       }
 
       // get list of selected column IDs
       prjColIDs = new int[loadColumnNum];
-      for ( int i = 0, j = 0; i < columnNumber; ++i ) {
+      for (int i = 0, j = 0; i < columnNumber; ++i) {
         if (!skippedColIDs[i]) {
           prjColIDs[j++] = i;
         }
@@ -1007,8 +1027,9 @@
       columnRowReadIndex = new int[columnNumber];
       for (int i = 0; i < columnNumber; i++) {
         columnRowReadIndex[i] = 0;
-        if (!skippedColIDs[i])
+        if (!skippedColIDs[i]) {
           colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+        }
         columnRunLength[i] = 0;
         columnPrvLength[i] = -1;
       }
@@ -1022,20 +1043,23 @@
       in.readFully(versionBlock);
 
       if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1])
-          || (versionBlock[2] != VERSION[2]))
+          || (versionBlock[2] != VERSION[2])) {
         throw new IOException(file + " not a RCFile");
+      }
 
       // Set 'version'
       version = versionBlock[3];
-      if (version > VERSION[3])
+      if (version > VERSION[3]) {
         throw new VersionMismatchException(VERSION[3], version);
+      }
 
       try {
         Class<?> keyCls = conf.getClassByName(Text.readString(in));
         Class<?> valCls = conf.getClassByName(Text.readString(in));
         if (!keyCls.equals(KeyBuffer.class)
-            || !valCls.equals(ValueBuffer.class))
+            || !valCls.equals(ValueBuffer.class)) {
           throw new IOException(file + " not a RCFile");
+        }
       } catch (ClassNotFoundException e) {
         throw new IOException(file + " not a RCFile", e);
       }
@@ -1110,8 +1134,9 @@
         for (int i = 0; in.getPos() < end; i++) {
           int j = 0;
           for (; j < syncLen; j++) {
-            if (sync[j] != syncCheck[(i + j) % syncLen])
+            if (sync[j] != syncCheck[(i + j) % syncLen]) {
               break;
+            }
           }
           if (j == syncLen) {
             in.seek(in.getPos() - SYNC_SIZE); // position before
@@ -1160,8 +1185,9 @@
         // a
         // sync entry
         in.readFully(syncCheck); // read syncCheck
-        if (!Arrays.equals(sync, syncCheck)) // check it
+        if (!Arrays.equals(sync, syncCheck)) {
           throw new IOException("File is corrupt!");
+        }
         syncSeen = true;
         if (in.getPos() >= end) {
           return -1;
@@ -1174,8 +1200,9 @@
     }
 
     private void seekToNextKeyBuffer() throws IOException {
-      if (!keyInit)
+      if (!keyInit) {
         return;
+      }
       if (!currentValue.inited) {
         in.skip(currentRecordLength - currentKeyLength);
       }
@@ -1220,21 +1247,22 @@
       readRowsIndexInBuffer = 0;
       recordsNumInValBuffer = currentKey.numberRows;
 
-      for (int j = 0; j < prjColIDs.length; j++) {
-        int i = prjColIDs[j];
+      for (int prjColID : prjColIDs) {
+        int i = prjColID;
         colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
             .getData(), currentKey.allCellValLenBuffer[i].getLength());
         columnRowReadIndex[i] = 0;
         columnRunLength[i] = 0;
         columnPrvLength[i] = -1;
       }
-      
+
       return currentKeyLength;
     }
 
     protected void currentValueBuffer() throws IOException {
-      if (!keyInit)
+      if (!keyInit) {
         nextKeyBuffer();
+      }
       currentValue.keyBuffer = currentKey;
       currentValue.clearColumnBuffer();
       currentValue.readFields(in);
@@ -1245,7 +1273,7 @@
 
     // use this buffer to hold column's cells value length for usages in
     // getColumn(), instead of using colValLenBufferReadIn directly.
-    private NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
+    private final NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
 
     /**
      * Fetch all data in the buffer for a given column. This is useful for
@@ -1271,8 +1299,9 @@
 
       rest.resetValid(recordsNumInValBuffer);
 
-      if (!currentValue.inited)
+      if (!currentValue.inited) {
         currentValueBuffer();
+      }
 
       int columnNextRowStart = 0;
       fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
@@ -1281,10 +1310,13 @@
         int length = getColumnNextValueLength(columnID);
 
         BytesRefWritable currentCell = rest.get(i);
-        if (currentValue.decompressedFlag[columnID])
-          currentCell.set(currentValue.loadedColumnsValueBuffer[columnID].getData(), columnNextRowStart, length);
-        else
-          currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID], columnNextRowStart, length);
+        if (currentValue.decompressedFlag[columnID]) {
+          currentCell.set(currentValue.loadedColumnsValueBuffer[columnID]
+              .getData(), columnNextRowStart, length);
+        } else {
+          currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID],
+              columnNextRowStart, length);
+        }
         columnNextRowStart = columnNextRowStart + length;
       }
       return rest;
@@ -1292,8 +1324,9 @@
 
     /**
      * Read in next key buffer and throw any data in current key buffer and
-     * current value buffer. It will influence the result of {@link
-     * #next(LongWritable)} and {@link #getCurrentRow(BytesRefArrayWritable)}
+     * current value buffer. It will influence the result of
+     * {@link #next(LongWritable)} and
+     * {@link #getCurrentRow(BytesRefArrayWritable)}
      * 
      * @return whether there still has records or not
      * @throws IOException
@@ -1349,14 +1382,15 @@
     public synchronized void getCurrentRow(BytesRefArrayWritable ret)
         throws IOException {
 
-      if (!keyInit || rowFetched)
+      if (!keyInit || rowFetched) {
         return;
+      }
 
       if (!currentValue.inited) {
         currentValueBuffer();
-				// do this only when not initialized, but we may need to find a way to
-				// tell the caller how to initialize the valid size
-        ret.resetValid(columnNumber); 
+        // do this only when not initialized, but we may need to find a way to
+        // tell the caller how to initialize the valid size
+        ret.resetValid(columnNumber);
       }
 
       // we do not use BytesWritable here to avoid the byte-copy from
@@ -1367,14 +1401,17 @@
 
         BytesRefWritable ref = ret.unCheckedGet(i);
 
-        int columnCurrentRowStart = (int) columnRowReadIndex[i];
+        int columnCurrentRowStart = columnRowReadIndex[i];
         int length = getColumnNextValueLength(i);
         columnRowReadIndex[i] = columnCurrentRowStart + length;
 
-        if (currentValue.decompressedFlag[j])
-          ref.set(currentValue.loadedColumnsValueBuffer[j].getData(), columnCurrentRowStart, length);
-        else
-          ref.set(currentValue.lazyDecompressCallbackObjs[j], columnCurrentRowStart, length);
+        if (currentValue.decompressedFlag[j]) {
+          ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+              columnCurrentRowStart, length);
+        } else {
+          ref.set(currentValue.lazyDecompressCallbackObjs[j],
+              columnCurrentRowStart, length);
+        }
       }
       rowFetched = true;
     }
@@ -1405,6 +1442,7 @@
     }
 
     /** Returns the name of the file. */
+    @Override
     public String toString() {
       return file.toString();
     }
@@ -1413,7 +1451,7 @@
     public void close() {
       IOUtils.closeStream(in);
       currentValue.close();
-      if (this.decompress) {
+      if (decompress) {
         IOUtils.closeStream(keyDecompressedData);
         CodecPool.returnDecompressor(keyDecompressor);
       }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java Thu Jan 21 10:37:58 2010
@@ -35,12 +35,13 @@
 import org.apache.hadoop.mapred.Reporter;
 
 public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>
-    extends FileInputFormat<K, V> implements InputFormatChecker{
+    extends FileInputFormat<K, V> implements InputFormatChecker {
 
   public RCFileInputFormat() {
     setMinSplitSize(SequenceFile.SYNC_INTERVAL);
   }
 
+  @Override
   @SuppressWarnings("unchecked")
   public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
       Reporter reporter) throws IOException {
@@ -53,11 +54,13 @@
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
       ArrayList<FileStatus> files) throws IOException {
-    if (files.size() <= 0)
+    if (files.size() <= 0) {
       return false;
+    }
     for (int fileId = 0; fileId < files.size(); fileId++) {
       try {
-        RCFile.Reader reader = new RCFile.Reader(fs, files.get(fileId).getPath(), conf);
+        RCFile.Reader reader = new RCFile.Reader(fs, files.get(fileId)
+            .getPath(), conf);
         reader.close();
       } catch (IOException e) {
         return false;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -67,6 +67,7 @@
   }
 
   /** {@inheritDoc} */
+  @Override
   public RecordWriter<WritableComparable, BytesRefArrayWritable> getRecordWriter(
       FileSystem ignored, JobConf job, String name, Progressable progress)
       throws IOException {
@@ -124,10 +125,11 @@
 
     String[] cols = null;
     String columns = tableProperties.getProperty("columns");
-    if (columns == null || columns.trim().equals(""))
+    if (columns == null || columns.trim().equals("")) {
       cols = new String[0];
-    else
+    } else {
       cols = StringUtils.split(columns, ",");
+    }
 
     RCFileOutputFormat.setColumnNumber(jc, cols.length);
     final RCFile.Writer outWriter = Utilities.createRCFileWriter(jc, FileSystem

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java Thu Jan 21 10:37:58 2010
@@ -33,9 +33,9 @@
 public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>
     implements RecordReader<LongWritable, BytesRefArrayWritable> {
 
-  private Reader in;
-  private long start;
-  private long end;
+  private final Reader in;
+  private final long start;
+  private final long end;
   private boolean more = true;
   protected Configuration conf;
 
@@ -47,8 +47,9 @@
     this.end = split.getStart() + split.getLength();
     this.conf = conf;
 
-    if (split.getStart() > in.getPosition())
+    if (split.getStart() > in.getPosition()) {
       in.sync(split.getStart()); // sync to start
+    }
 
     this.start = in.getPosition();
     more = start < end;
@@ -76,8 +77,9 @@
   @Override
   public boolean next(LongWritable key, BytesRefArrayWritable value)
       throws IOException {
-    if (!more)
+    if (!more) {
       return false;
+    }
     long pos = in.getPosition();
     boolean hasMore = in.next(key);
     if (hasMore) {
@@ -96,8 +98,9 @@
   }
 
   protected boolean next(LongWritable key) throws IOException {
-    if (!more)
+    if (!more) {
       return false;
+    }
     long pos = in.getPosition();
     boolean hasMore = in.next(key);
     if (pos >= end && in.syncSeen()) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java Thu Jan 21 10:37:58 2010
@@ -31,8 +31,9 @@
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
       ArrayList<FileStatus> files) throws IOException {
-    if (files.size() <= 0)
+    if (files.size() <= 0) {
       return false;
+    }
     for (int fileId = 0; fileId < files.size(); fileId++) {
       try {
         SequenceFile.Reader reader = new SequenceFile.Reader(fs, files.get(

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java Thu Jan 21 10:37:58 2010
@@ -29,26 +29,29 @@
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
- * base class for operator graph walker
- * this class takes list of starting ops and walks them one by one. it maintains list of walked
- * operators (dispatchedList) and a list of operators that are discovered but not yet dispatched
+ * base class for operator graph walker this class takes list of starting ops
+ * and walks them one by one. it maintains list of walked operators
+ * (dispatchedList) and a list of operators that are discovered but not yet
+ * dispatched
  */
 public class DefaultGraphWalker implements GraphWalker {
 
   protected Stack<Node> opStack;
-  private List<Node> toWalk = new ArrayList<Node>();
-  private Set<Node> seenList = new HashSet<Node>();
-  private HashMap<Node, Object> retMap = new HashMap<Node, Object>();
-  private Dispatcher dispatcher;
+  private final List<Node> toWalk = new ArrayList<Node>();
+  private final Set<Node> seenList = new HashSet<Node>();
+  private final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
+  private final Dispatcher dispatcher;
 
   /**
    * Constructor
-   * @param disp dispatcher to call for each op encountered
+   * 
+   * @param disp
+   *          dispatcher to call for each op encountered
    */
   public DefaultGraphWalker(Dispatcher disp) {
-    this.dispatcher = disp;
+    dispatcher = disp;
     opStack = new Stack<Node>();
- }
+  }
 
   /**
    * @return the toWalk
@@ -66,8 +69,11 @@
 
   /**
    * Dispatch the current operator
-   * @param nd node being walked
-   * @param ndStack stack of nodes encountered
+   * 
+   * @param nd
+   *          node being walked
+   * @param ndStack
+   *          stack of nodes encountered
    * @throws SemanticException
    */
   public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
@@ -75,22 +81,24 @@
     if (nd.getChildren() != null) {
       nodeOutputs = new Object[nd.getChildren().size()];
       int i = 0;
-      for(Node child: nd.getChildren()) {
+      for (Node child : nd.getChildren()) {
         nodeOutputs[i++] = retMap.get(child);
       }
     }
-    
+
     Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
     retMap.put(nd, retVal);
   }
 
   /**
    * starting point for walking
+   * 
    * @throws SemanticException
    */
-  public void startWalking(Collection<Node> startNodes, HashMap<Node, Object> nodeOutput) throws SemanticException {
+  public void startWalking(Collection<Node> startNodes,
+      HashMap<Node, Object> nodeOutput) throws SemanticException {
     toWalk.addAll(startNodes);
-    while(toWalk.size() > 0) {
+    while (toWalk.size() > 0) {
       Node nd = toWalk.remove(0);
       walk(nd);
       if (nodeOutput != null) {
@@ -101,19 +109,23 @@
 
   /**
    * walk the current operator and its descendants
-   * @param nd current operator in the graph
+   * 
+   * @param nd
+   *          current operator in the graph
    * @throws SemanticException
    */
   public void walk(Node nd) throws SemanticException {
-    if (opStack.empty() || nd != opStack.peek())
+    if (opStack.empty() || nd != opStack.peek()) {
       opStack.push(nd);
+    }
 
-    if((nd.getChildren() == null) 
+    if ((nd.getChildren() == null)
         || getDispatchedList().containsAll(nd.getChildren())) {
       // all children are done or no need to walk the children
-      if(getDispatchedList().contains(nd))
+      if (getDispatchedList().contains(nd)) {
         // sanity check
         assert false;
+      }
       dispatch(nd, opStack);
       opStack.pop();
       return;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java Thu Jan 21 10:37:58 2010
@@ -24,36 +24,44 @@
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
- * Dispatches calls to relevant method in processor. The user registers various rules with the dispatcher, and
- * the processor corresponding to closest matching rule is fired.
+ * Dispatches calls to relevant method in processor. The user registers various
+ * rules with the dispatcher, and the processor corresponding to closest
+ * matching rule is fired.
  */
 public class DefaultRuleDispatcher implements Dispatcher {
-  
-  private Map<Rule, NodeProcessor>  procRules;
-  private NodeProcessorCtx          procCtx;
-  private NodeProcessor             defaultProc;
+
+  private final Map<Rule, NodeProcessor> procRules;
+  private final NodeProcessorCtx procCtx;
+  private final NodeProcessor defaultProc;
 
   /**
    * constructor
-   * @param defaultProc default processor to be fired if no rule matches
-   * @param rules operator processor that handles actual processing of the node
-   * @param procCtx operator processor context, which is opaque to the dispatcher
+   * 
+   * @param defaultProc
+   *          default processor to be fired if no rule matches
+   * @param rules
+   *          operator processor that handles actual processing of the node
+   * @param procCtx
+   *          operator processor context, which is opaque to the dispatcher
    */
-  public DefaultRuleDispatcher(NodeProcessor defaultProc, 
-                               Map<Rule, NodeProcessor> rules, NodeProcessorCtx procCtx) {
+  public DefaultRuleDispatcher(NodeProcessor defaultProc,
+      Map<Rule, NodeProcessor> rules, NodeProcessorCtx procCtx) {
     this.defaultProc = defaultProc;
-    this.procRules = rules;
-    this.procCtx   = procCtx;
+    procRules = rules;
+    this.procCtx = procCtx;
   }
 
   /**
    * dispatcher function
-   * @param nd operator to process
-   * @param ndStack the operators encountered so far
+   * 
+   * @param nd
+   *          operator to process
+   * @param ndStack
+   *          the operators encountered so far
    * @throws SemanticException
    */
-  public Object dispatch(Node nd, Stack<Node> ndStack, Object... nodeOutputs) 
-    throws SemanticException {
+  public Object dispatch(Node nd, Stack<Node> ndStack, Object... nodeOutputs)
+      throws SemanticException {
 
     // find the firing rule
     // find the rule from the stack specified
@@ -69,17 +77,18 @@
 
     NodeProcessor proc;
 
-    if (rule == null)
+    if (rule == null) {
       proc = defaultProc;
-    else
+    } else {
       proc = procRules.get(rule);
+    }
 
     // Do nothing in case proc is null
     if (proc != null) {
       // Call the process function
       return proc.process(nd, ndStack, procCtx, nodeOutputs);
-    }
-    else
+    } else {
       return null;
+    }
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java Thu Jan 21 10:37:58 2010
@@ -23,20 +23,24 @@
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
- * Dispatcher interface for Operators
- * Used in operator graph walking to dispatch process/visitor functions for operators
+ * Dispatcher interface for Operators Used in operator graph walking to dispatch
+ * process/visitor functions for operators
  */
 public interface Dispatcher {
 
   /**
    * Dispatcher function.
-   * @param nd operator to process.
-   * @param stack operator stack to process.
-   * @param nodeOutputs The argument list of outputs from processing other nodes that are
-   * passed to this dispatcher from the walker.
+   * 
+   * @param nd
+   *          operator to process.
+   * @param stack
+   *          operator stack to process.
+   * @param nodeOutputs
+   *          The argument list of outputs from processing other nodes that are
+   *          passed to this dispatcher from the walker.
    * @return Object The return object from the processing call.
    * @throws SemanticException
    */
-  public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) 
-    throws SemanticException;
+  public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+      throws SemanticException;
 }