You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [7/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Jan 21 10:37:58 2010
@@ -34,7 +34,6 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -46,16 +45,19 @@
 import org.apache.hadoop.io.Writable;
 
 /**
- * Map operator. This triggers overall map side processing.
- * This is a little different from regular operators in that
- * it starts off by processing a Writable data structure from
- * a Table (instead of a Hive Object).
+ * Map operator. This triggers overall map side processing. This is a little
+ * different from regular operators in that it starts off by processing a
+ * Writable data structure from a Table (instead of a Hive Object).
  **/
-public class MapOperator extends Operator <mapredWork> implements Serializable {
+public class MapOperator extends Operator<mapredWork> implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  public static enum Counter {DESERIALIZE_ERRORS}
-  transient private LongWritable deserialize_error_count = new LongWritable ();
+
+  public static enum Counter {
+    DESERIALIZE_ERRORS
+  }
+
+  transient private final LongWritable deserialize_error_count = new LongWritable();
   transient private Deserializer deserializer;
 
   transient private Object[] rowWithPart;
@@ -65,7 +67,7 @@
 
   private Map<Operator<? extends Serializable>, java.util.ArrayList<String>> operatorToPaths;
 
-  private java.util.ArrayList<String> childrenPaths = new ArrayList<String>();
+  private final java.util.ArrayList<String> childrenPaths = new ArrayList<String>();
 
   private ArrayList<Operator<? extends Serializable>> extraChildrenToClose = null;
 
@@ -86,27 +88,31 @@
       this.op = op;
     }
 
+    @Override
     public boolean equals(Object o) {
       if (o instanceof MapInputPath) {
-        MapInputPath mObj = (MapInputPath)o;
-        if (mObj == null)
+        MapInputPath mObj = (MapInputPath) o;
+        if (mObj == null) {
           return false;
-        return path.equals(mObj.path) && alias.equals(mObj.alias) && op.equals(mObj.op);
+        }
+        return path.equals(mObj.path) && alias.equals(mObj.alias)
+            && op.equals(mObj.op);
       }
 
       return false;
     }
 
+    @Override
     public int hashCode() {
       return (op == null) ? 0 : op.hashCode();
     }
   }
 
   private static class MapOpCtx {
-    boolean               isPartitioned;
+    boolean isPartitioned;
     StructObjectInspector rowObjectInspector;
-    Object[]              rowWithPart;
-    Deserializer          deserializer;
+    Object[] rowWithPart;
+    Deserializer deserializer;
     public String tableName;
     public String partName;
 
@@ -116,7 +122,8 @@
      * @param rowWithPart
      */
     public MapOpCtx(boolean isPartitioned,
-        StructObjectInspector rowObjectInspector, Object[] rowWithPart, Deserializer deserializer) {
+        StructObjectInspector rowObjectInspector, Object[] rowWithPart,
+        Deserializer deserializer) {
       this.isPartitioned = isPartitioned;
       this.rowObjectInspector = rowObjectInspector;
       this.rowWithPart = rowWithPart;
@@ -153,78 +160,89 @@
   }
 
   /**
-   * Initializes this map op as the root of the tree. It sets JobConf & MapRedWork
-   * and starts initialization of the operator tree rooted at this op.
+   * Initializes this map op as the root of the tree. It sets JobConf &
+   * MapRedWork and starts initialization of the operator tree rooted at this
+   * op.
+   * 
    * @param hconf
    * @param mrwork
    * @throws HiveException
    */
-  public void initializeAsRoot(Configuration hconf, mapredWork mrwork) throws HiveException {
+  public void initializeAsRoot(Configuration hconf, mapredWork mrwork)
+      throws HiveException {
     setConf(mrwork);
     setChildren(hconf);
     initialize(hconf, null);
   }
 
-  private static MapOpCtx initObjectInspector(mapredWork conf, Configuration hconf, String onefile)
-    throws HiveException, ClassNotFoundException, InstantiationException, IllegalAccessException, SerDeException {
+  private static MapOpCtx initObjectInspector(mapredWork conf,
+      Configuration hconf, String onefile) throws HiveException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException,
+      SerDeException {
     partitionDesc td = conf.getPathToPartitionInfo().get(onefile);
     LinkedHashMap<String, String> partSpec = td.getPartSpec();
     Properties tblProps = td.getProperties();
 
     Class sdclass = td.getDeserializerClass();
-    if(sdclass == null) {
+    if (sdclass == null) {
       String className = td.getSerdeClassName();
       if ((className == "") || (className == null)) {
-        throw new HiveException("SerDe class or the SerDe class name is not set for table: "
-            + td.getProperties().getProperty("name"));
+        throw new HiveException(
+            "SerDe class or the SerDe class name is not set for table: "
+                + td.getProperties().getProperty("name"));
       }
       sdclass = hconf.getClassByName(className);
     }
 
     String tableName = String.valueOf(tblProps.getProperty("name"));
     String partName = String.valueOf(partSpec);
-    //HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, tableName);
-    //HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, partName);
+    // HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, tableName);
+    // HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, partName);
     Deserializer deserializer = (Deserializer) sdclass.newInstance();
     deserializer.initialize(hconf, tblProps);
-    StructObjectInspector rowObjectInspector = (StructObjectInspector)deserializer.getObjectInspector();
+    StructObjectInspector rowObjectInspector = (StructObjectInspector) deserializer
+        .getObjectInspector();
 
     MapOpCtx opCtx = null;
     // Next check if this table has partitions and if so
     // get the list of partition names as well as allocate
     // the serdes for the partition columns
-    String pcols = tblProps.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
-    //Log LOG = LogFactory.getLog(MapOperator.class.getName());
+    String pcols = tblProps
+        .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+    // Log LOG = LogFactory.getLog(MapOperator.class.getName());
     if (pcols != null && pcols.length() > 0) {
       String[] partKeys = pcols.trim().split("/");
       List<String> partNames = new ArrayList<String>(partKeys.length);
       Object[] partValues = new Object[partKeys.length];
-      List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-      for(int i = 0; i < partKeys.length; i++ ) {
+      List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(
+          partKeys.length);
+      for (int i = 0; i < partKeys.length; i++) {
         String key = partKeys[i];
         partNames.add(key);
         // Partitions do not exist for this table
-        if (partSpec == null)
+        if (partSpec == null) {
           partValues[i] = new Text();
-        else
+        } else {
           partValues[i] = new Text(partSpec.get(key));
-        partObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
+        }
+        partObjectInspectors
+            .add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
       }
       StructObjectInspector partObjectInspector = ObjectInspectorFactory
-                  .getStandardStructObjectInspector(partNames, partObjectInspectors);
+          .getStandardStructObjectInspector(partNames, partObjectInspectors);
 
       Object[] rowWithPart = new Object[2];
       rowWithPart[1] = partValues;
       rowObjectInspector = ObjectInspectorFactory
-                                .getUnionStructObjectInspector(
-                                    Arrays.asList(new StructObjectInspector[]{
-                                                    rowObjectInspector,
-                                                    partObjectInspector}));
-      //LOG.info("dump " + tableName + " " + partName + " " + rowObjectInspector.getTypeName());
+          .getUnionStructObjectInspector(Arrays
+              .asList(new StructObjectInspector[] { rowObjectInspector,
+                  partObjectInspector }));
+      // LOG.info("dump " + tableName + " " + partName + " " +
+      // rowObjectInspector.getTypeName());
       opCtx = new MapOpCtx(true, rowObjectInspector, rowWithPart, deserializer);
-    }
-    else {
-      //LOG.info("dump2 " + tableName + " " + partName + " " + rowObjectInspector.getTypeName());
+    } else {
+      // LOG.info("dump2 " + tableName + " " + partName + " " +
+      // rowObjectInspector.getTypeName());
       opCtx = new MapOpCtx(false, rowObjectInspector, null, deserializer);
     }
     opCtx.tableName = tableName;
@@ -236,10 +254,9 @@
 
     Path fpath = new Path((new Path(HiveConf.getVar(hconf,
         HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
-    ArrayList<Operator<? extends Serializable>> children =
-        new ArrayList<Operator<? extends Serializable>>();
+    ArrayList<Operator<? extends Serializable>> children = new ArrayList<Operator<? extends Serializable>>();
     opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
-    operatorToPaths = new HashMap<Operator<? extends Serializable>, java.util.ArrayList<String>> ();
+    operatorToPaths = new HashMap<Operator<? extends Serializable>, java.util.ArrayList<String>>();
 
     statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
 
@@ -256,17 +273,21 @@
               + fpath.toUri().getPath());
           MapInputPath inp = new MapInputPath(onefile, onealias, op);
           opCtxMap.put(inp, opCtx);
-          if(operatorToPaths.get(op) == null)
-          	operatorToPaths.put(op, new java.util.ArrayList<String>());
+          if (operatorToPaths.get(op) == null) {
+            operatorToPaths.put(op, new java.util.ArrayList<String>());
+          }
           operatorToPaths.get(op).add(onefile);
 
-          op.setParentOperators(new ArrayList<Operator<? extends Serializable>>());
+          op
+              .setParentOperators(new ArrayList<Operator<? extends Serializable>>());
           op.getParentOperators().add(this);
-          // check for the operators who will process rows coming to this Map Operator
+          // check for the operators who will process rows coming to this Map
+          // Operator
           if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
             children.add(op);
             childrenPaths.add(onefile);
-            LOG.info("dump " + op.getName() + " " + opCtxMap.get(inp).getRowObjectInspector().getTypeName());
+            LOG.info("dump " + op.getName() + " "
+                + opCtxMap.get(inp).getRowObjectInspector().getTypeName());
             if (!done) {
               deserializer = opCtxMap.get(inp).getDeserializer();
               isPartitioned = opCtxMap.get(inp).isPartitioned();
@@ -292,48 +313,55 @@
     }
   }
 
-
+  @Override
   public void initializeOp(Configuration hconf) throws HiveException {
     // set that parent initialization is done and call initialize on children
     state = State.INIT;
     List<Operator<? extends Serializable>> children = getChildOperators();
 
     for (Entry<MapInputPath, MapOpCtx> entry : opCtxMap.entrySet()) {
-      // Add alias, table name, and partitions to hadoop conf so that their children will
+      // Add alias, table name, and partitions to hadoop conf so that their
+      // children will
       // inherit these
-      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, entry.getValue().tableName);
-      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry.getValue().partName);
-      MapInputPath  input = entry.getKey();
+      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME,
+          entry.getValue().tableName);
+      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry
+          .getValue().partName);
+      MapInputPath input = entry.getKey();
       Operator<? extends Serializable> op = input.op;
-      // op is not in the children list, so need to remember it and close it afterwards
-      if ( children.indexOf(op) == -1 ) {
-        if ( extraChildrenToClose == null ) {
+      // op is not in the children list, so need to remember it and close it
+      // afterwards
+      if (children.indexOf(op) == -1) {
+        if (extraChildrenToClose == null) {
           extraChildrenToClose = new ArrayList<Operator<? extends Serializable>>();
         }
         extraChildrenToClose.add(op);
       }
 
-			// multiple input paths may corresponding the same operator (tree). The
-			// below logic is to avoid initialize one operator multiple times if there
-			// is one input path in this mapper's input paths.
+      // multiple input paths may corresponding the same operator (tree). The
+      // below logic is to avoid initialize one operator multiple times if there
+      // is one input path in this mapper's input paths.
       boolean shouldInit = true;
       List<String> paths = operatorToPaths.get(op);
-      for(String path: paths) {
-      	if(childrenPaths.contains(path) && !path.equals(input.path)) {
-      		shouldInit = false;
-      		break;
-      	}
+      for (String path : paths) {
+        if (childrenPaths.contains(path) && !path.equals(input.path)) {
+          shouldInit = false;
+          break;
+        }
+      }
+      if (shouldInit) {
+        op.initialize(hconf, new ObjectInspector[] { entry.getValue()
+            .getRowObjectInspector() });
       }
-      if(shouldInit)
-      	op.initialize(hconf, new ObjectInspector[]{entry.getValue().getRowObjectInspector()});
     }
   }
 
   /**
    * close extra child operators that are initialized but are not executed.
    */
+  @Override
   public void closeOp(boolean abort) throws HiveException {
-    if ( extraChildrenToClose != null ) {
+    if (extraChildrenToClose != null) {
       for (Operator<? extends Serializable> op : extraChildrenToClose) {
         op.close(abort);
       }
@@ -351,16 +379,17 @@
       }
     } catch (SerDeException e) {
       // TODO: policy on deserialization errors
-      deserialize_error_count.set(deserialize_error_count.get()+1);
-      throw new HiveException (e);
+      deserialize_error_count.set(deserialize_error_count.get() + 1);
+      throw new HiveException(e);
     }
   }
 
-  public void processOp(Object row, int tag)
-      throws HiveException {
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
     throw new HiveException("Hive 2 Internal error: should not be called!");
   }
 
+  @Override
   public String getName() {
     return "MAP";
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Thu Jan 21 10:37:58 2010
@@ -35,22 +35,23 @@
 import org.apache.hadoop.hive.shims.ShimLoader;
 
 /**
- * Alternate implementation (to ExecDriver) of spawning a mapreduce task that runs it from
- * a separate jvm. The primary issue with this is the inability to control logging from
- * a separate jvm in a consistent manner
+ * Alternate implementation (to ExecDriver) of spawning a mapreduce task that
+ * runs it from a separate jvm. The primary issue with this is the inability to
+ * control logging from a separate jvm in a consistent manner
  **/
 public class MapRedTask extends Task<mapredWork> implements Serializable {
-    
+
   private static final long serialVersionUID = 1L;
 
   final static String hadoopMemKey = "HADOOP_HEAPSIZE";
   final static String hadoopOptsKey = "HADOOP_OPTS";
-  final static String HIVE_SYS_PROP[] = {"build.dir", "build.dir.hive"}; 
-  
+  final static String HIVE_SYS_PROP[] = { "build.dir", "build.dir.hive" };
+
   public MapRedTask() {
     super();
   }
-  
+
+  @Override
   public int execute() {
 
     try {
@@ -60,7 +61,8 @@
 
       String libJarsOption;
       {
-        String addedJars = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.JAR);
+        String addedJars = ExecDriver.getResourceFiles(conf,
+            SessionState.ResourceType.JAR);
         conf.setVar(ConfVars.HIVEADDEDJARS, addedJars);
 
         String auxJars = conf.getAuxJars();
@@ -76,14 +78,14 @@
             libJarsOption = " -libjars " + addedJars + " ";
           } else {
             libJarsOption = " -libjars " + addedJars + "," + auxJars + " ";
-          }   
+          }
         }
       }
 
       // Generate the hiveConfArgs after potentially adding the jars
       String hiveConfArgs = ExecDriver.generateCmdLine(conf);
       File scratchDir = new File(conf.getVar(HiveConf.ConfVars.SCRATCHDIR));
-      
+
       mapredWork plan = getWork();
 
       File planFile = File.createTempFile("plan", ".xml", scratchDir);
@@ -91,21 +93,22 @@
       FileOutputStream out = new FileOutputStream(planFile);
       Utilities.serializeMapRedWork(plan, out);
 
-      String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent"))
-                        ? "-silent" : "";
+      String isSilent = "true".equalsIgnoreCase(System
+          .getProperty("test.silent")) ? "-silent" : "";
 
       String jarCmd;
-      if(ShimLoader.getHadoopShims().usesJobShell()) {
+      if (ShimLoader.getHadoopShims().usesJobShell()) {
         jarCmd = libJarsOption + hiveJar + " " + ExecDriver.class.getName();
       } else {
         jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
       }
 
-      String cmdLine = hadoopExec + " jar " + jarCmd + 
-        " -plan " + planFile.toString() + " " + isSilent + " " + hiveConfArgs; 
-      
-      String files = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.FILE);
-      if(!files.isEmpty()) {
+      String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
+          + planFile.toString() + " " + isSilent + " " + hiveConfArgs;
+
+      String files = ExecDriver.getResourceFiles(conf,
+          SessionState.ResourceType.FILE);
+      if (!files.isEmpty()) {
         cmdLine = cmdLine + " -files " + files;
       }
 
@@ -117,63 +120,65 @@
       {
         StringBuilder sb = new StringBuilder();
         Properties p = System.getProperties();
-        for (int k = 0; k < HIVE_SYS_PROP.length; k++) {
-          if (p.containsKey(HIVE_SYS_PROP[k])) {
-            sb.append(" -D" + HIVE_SYS_PROP[k] + "=" + p.getProperty(HIVE_SYS_PROP[k]));
+        for (String element : HIVE_SYS_PROP) {
+          if (p.containsKey(element)) {
+            sb.append(" -D" + element + "=" + p.getProperty(element));
           }
         }
         hadoopOpts = sb.toString();
       }
-      
+
       // Inherit the environment variables
       String[] env;
       {
         Map<String, String> variables = new HashMap(System.getenv());
         // The user can specify the hadoop memory
         int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
-        
+
         if (hadoopMem == 0) {
           variables.remove(hadoopMemKey);
         } else {
           // user specified the memory - only applicable for local mode
           variables.put(hadoopMemKey, String.valueOf(hadoopMem));
         }
-        
+
         if (variables.containsKey(hadoopOptsKey)) {
-          variables.put(hadoopOptsKey, variables.get(hadoopOptsKey) + hadoopOpts);
+          variables.put(hadoopOptsKey, variables.get(hadoopOptsKey)
+              + hadoopOpts);
         } else {
           variables.put(hadoopOptsKey, hadoopOpts);
         }
-        
+
         env = new String[variables.size()];
         int pos = 0;
-        for (Map.Entry<String, String> entry : variables.entrySet()) {  
-          String name = entry.getKey();  
-          String value = entry.getValue();  
-          env[pos++] = name + "=" + value;  
-        }  
+        for (Map.Entry<String, String> entry : variables.entrySet()) {
+          String name = entry.getKey();
+          String value = entry.getValue();
+          env[pos++] = name + "=" + value;
+        }
       }
-      
+
       // Run ExecDriver in another JVM
       executor = Runtime.getRuntime().exec(cmdLine, env);
 
-      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
-      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
-      
+      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+          null, System.out);
+      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+          null, System.err);
+
       outPrinter.start();
       errPrinter.start();
-    
+
       int exitVal = executor.waitFor();
 
-      if(exitVal != 0) {
+      if (exitVal != 0) {
         LOG.error("Execution failed with exit status: " + exitVal);
       } else {
         LOG.info("Execution completed successfully");
       }
 
       return exitVal;
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       e.printStackTrace();
       LOG.error("Exception: " + e.getMessage());
       return (1);
@@ -190,7 +195,8 @@
     mapredWork w = getWork();
     return w.getReducer() != null;
   }
-  
+
+  @Override
   public int getType() {
     return StageType.MAPREDLOCAL;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Jan 21 10:37:58 2010
@@ -51,6 +51,7 @@
     super();
   }
 
+  @Override
   public int execute() {
 
     try {
@@ -64,39 +65,44 @@
         if (lfd.getIsDfsDir()) {
           // Just do a rename on the URIs, they belong to the same FS
           String mesg = "Moving data to: " + lfd.getTargetDir();
-          String mesg_detail = " from " +  lfd.getSourceDir();
+          String mesg_detail = " from " + lfd.getSourceDir();
           console.printInfo(mesg, mesg_detail);
 
           // delete the output directory if it already exists
           fs.delete(targetPath, true);
           // if source exists, rename. Otherwise, create a empty directory
           if (fs.exists(sourcePath)) {
-            if (!fs.rename(sourcePath, targetPath))
-              throw new HiveException ("Unable to rename: " + sourcePath + " to: "
-                                       + targetPath);
-          } else
-            if (!fs.mkdirs(targetPath))
-              throw new HiveException ("Unable to make directory: " + targetPath);
+            if (!fs.rename(sourcePath, targetPath)) {
+              throw new HiveException("Unable to rename: " + sourcePath
+                  + " to: " + targetPath);
+            }
+          } else if (!fs.mkdirs(targetPath)) {
+            throw new HiveException("Unable to make directory: " + targetPath);
+          }
         } else {
           // This is a local file
           String mesg = "Copying data to local directory " + lfd.getTargetDir();
-          String mesg_detail =  " from " + lfd.getSourceDir();
+          String mesg_detail = " from " + lfd.getSourceDir();
           console.printInfo(mesg, mesg_detail);
 
           // delete the existing dest directory
           LocalFileSystem dstFs = FileSystem.getLocal(conf);
 
-          if(dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
+          if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
             console.printInfo(mesg, mesg_detail);
             // if source exists, rename. Otherwise, create a empty directory
-            if (fs.exists(sourcePath))
+            if (fs.exists(sourcePath)) {
               fs.copyToLocalFile(sourcePath, targetPath);
-            else {
-              if (!dstFs.mkdirs(targetPath))
-                throw new HiveException ("Unable to make local directory: " + targetPath);
+            } else {
+              if (!dstFs.mkdirs(targetPath)) {
+                throw new HiveException("Unable to make local directory: "
+                    + targetPath);
+              }
             }
           } else {
-            throw new AccessControlException("Unable to delete the existing destination directory: " + targetPath);
+            throw new AccessControlException(
+                "Unable to delete the existing destination directory: "
+                    + targetPath);
           }
         }
       }
@@ -104,55 +110,69 @@
       // Next we do this for tables and partitions
       loadTableDesc tbd = work.getLoadTableWork();
       if (tbd != null) {
-        String mesg = "Loading data to table " + tbd.getTable().getTableName() +
-        ((tbd.getPartitionSpec().size() > 0) ?
-            " partition " + tbd.getPartitionSpec().toString() : "");
+        String mesg = "Loading data to table "
+            + tbd.getTable().getTableName()
+            + ((tbd.getPartitionSpec().size() > 0) ? " partition "
+                + tbd.getPartitionSpec().toString() : "");
         String mesg_detail = " from " + tbd.getSourceDir();
         console.printInfo(mesg, mesg_detail);
-        Table table = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd.getTable().getTableName());
+        Table table = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd
+            .getTable().getTableName());
 
         if (work.getCheckFileFormat()) {
           // Get all files from the src directory
-          FileStatus [] dirs;
+          FileStatus[] dirs;
           ArrayList<FileStatus> files;
           FileSystem fs;
           try {
-            fs = FileSystem.get(table.getDataLocation(),conf);
+            fs = FileSystem.get(table.getDataLocation(), conf);
             dirs = fs.globStatus(new Path(tbd.getSourceDir()));
             files = new ArrayList<FileStatus>();
-            for (int i=0; (dirs != null && i<dirs.length); i++) {
+            for (int i = 0; (dirs != null && i < dirs.length); i++) {
               files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
-              // We only check one file, so exit the loop when we have at least one.
-              if (files.size()>0) break;
+              // We only check one file, so exit the loop when we have at least
+              // one.
+              if (files.size() > 0) {
+                break;
+              }
             }
           } catch (IOException e) {
-            throw new HiveException("addFiles: filesystem error in check phase", e);
+            throw new HiveException(
+                "addFiles: filesystem error in check phase", e);
           }
 
           // Check if the file format of the file matches that of the table.
-          boolean flag = HiveFileFormatUtils.checkInputFormat(fs, conf, tbd.getTable().getInputFileFormatClass(), files);
-          if(!flag)
-            throw new HiveException("Wrong file format. Please check the file's format.");
+          boolean flag = HiveFileFormatUtils.checkInputFormat(fs, conf, tbd
+              .getTable().getInputFileFormatClass(), files);
+          if (!flag) {
+            throw new HiveException(
+                "Wrong file format. Please check the file's format.");
+          }
         }
 
-        if(tbd.getPartitionSpec().size() == 0) {
-          db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()));
-          if (work.getOutputs() != null)
+        if (tbd.getPartitionSpec().size() == 0) {
+          db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable()
+              .getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()));
+          if (work.getOutputs() != null) {
             work.getOutputs().add(new WriteEntity(table));
+          }
         } else {
           LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
-          db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
-              tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
-          Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
-          if (work.getOutputs() != null)
+          db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable()
+              .getTableName(), tbd.getPartitionSpec(), tbd.getReplace(),
+              new Path(tbd.getTmpDir()));
+          Partition partn = db.getPartition(table, tbd.getPartitionSpec(),
+              false);
+          if (work.getOutputs() != null) {
             work.getOutputs().add(new WriteEntity(partn));
+          }
         }
       }
 
       return 0;
-    }
-    catch (Exception e) {
-      console.printError("Failed with exception " +   e.getMessage(), "\n" + StringUtils.stringifyException(e));
+    } catch (Exception e) {
+      console.printError("Failed with exception " + e.getMessage(), "\n"
+          + StringUtils.stringifyException(e));
       return (1);
     }
   }
@@ -162,21 +182,23 @@
    */
   public boolean isLocal() {
     loadTableDesc tbd = work.getLoadTableWork();
-    if (tbd != null)
+    if (tbd != null) {
       return false;
+    }
 
     loadFileDesc lfd = work.getLoadFileWork();
     if (lfd != null) {
       if (lfd.getIsDfsDir()) {
         return false;
-      }
-      else
+      } else {
         return true;
+      }
     }
 
     return false;
   }
 
+  @Override
   public int getType() {
     return StageType.MOVE;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java Thu Jan 21 10:37:58 2010
@@ -23,23 +23,20 @@
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
- * The class implements the method resolution for operators like 
- * (+, -, *, %). The resolution logic is as follows:
+ * The class implements the method resolution for operators like (+, -, *, %).
+ * The resolution logic is as follows:
  * 
- * 1. If one of the parameters is a string, then it resolves to
- *    evaluate(double, double)
- * 2. If one of the parameters is null, then it resolves to evaluate(T, T)
- *    where T is the other non-null parameter type.
- * 3. If both of the parameters are null, then it resolves to 
- *    evaluate(byte, byte)
- * 4. Otherwise, it resolves to evaluate(T, T), where T is the type resulting
- *    from calling FunctionRegistry.getCommonClass() on the two arguments.
+ * 1. If one of the parameters is a string, then it resolves to evaluate(double,
+ * double) 2. If one of the parameters is null, then it resolves to evaluate(T,
+ * T) where T is the other non-null parameter type. 3. If both of the parameters
+ * are null, then it resolves to evaluate(byte, byte) 4. Otherwise, it resolves
+ * to evaluate(T, T), where T is the type resulting from calling
+ * FunctionRegistry.getCommonClass() on the two arguments.
  */
 public class NumericOpMethodResolver implements UDFMethodResolver {
 
@@ -47,65 +44,68 @@
    * The udfclass for which resolution is needed.
    */
   Class<? extends UDF> udfClass;
-  
+
   /**
    * Constuctor.
    */
   public NumericOpMethodResolver(Class<? extends UDF> udfClass) {
     this.udfClass = udfClass;
   }
-  
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hive.ql.exec.UDFMethodResolver#getEvalMethod(java.util.List)
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.hive.ql.exec.UDFMethodResolver#getEvalMethod(java.util
+   * .List)
    */
   @Override
   public Method getEvalMethod(List<TypeInfo> argTypeInfos)
-  throws AmbiguousMethodException, UDFArgumentException {
-    assert(argTypeInfos.size() == 2);
+      throws AmbiguousMethodException, UDFArgumentException {
+    assert (argTypeInfos.size() == 2);
 
     List<TypeInfo> pTypeInfos = null;
     List<TypeInfo> modArgTypeInfos = new ArrayList<TypeInfo>();
 
     // If either argument is a string, we convert to a double because a number
     // in string form should always be convertible into a double
-    if (argTypeInfos.get(0).equals(TypeInfoFactory.stringTypeInfo) ||
-        argTypeInfos.get(1).equals(TypeInfoFactory.stringTypeInfo) ) {
+    if (argTypeInfos.get(0).equals(TypeInfoFactory.stringTypeInfo)
+        || argTypeInfos.get(1).equals(TypeInfoFactory.stringTypeInfo)) {
       modArgTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
       modArgTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
     } else {
       // If it's a void, we change the type to a byte because once the types
       // are run through getCommonClass(), a byte and any other type T will
       // resolve to type T
-      for(int i=0; i<2; i++) {
-        if(argTypeInfos.get(i).equals(TypeInfoFactory.voidTypeInfo)) {
-          modArgTypeInfos.add(TypeInfoFactory.byteTypeInfo); 
+      for (int i = 0; i < 2; i++) {
+        if (argTypeInfos.get(i).equals(TypeInfoFactory.voidTypeInfo)) {
+          modArgTypeInfos.add(TypeInfoFactory.byteTypeInfo);
         } else {
           modArgTypeInfos.add(argTypeInfos.get(i));
         }
       }
     }
-    
-    TypeInfo commonType = FunctionRegistry.getCommonClass(
-        modArgTypeInfos.get(0), 
-        modArgTypeInfos.get(1));
-    
-    if(commonType == null) {
-      throw new UDFArgumentException("Unable to find a common class between" +
-      		"types " + modArgTypeInfos.get(0).getTypeName() + 
-      		" and " + modArgTypeInfos.get(1).getTypeName());
+
+    TypeInfo commonType = FunctionRegistry.getCommonClass(modArgTypeInfos
+        .get(0), modArgTypeInfos.get(1));
+
+    if (commonType == null) {
+      throw new UDFArgumentException("Unable to find a common class between"
+          + "types " + modArgTypeInfos.get(0).getTypeName() + " and "
+          + modArgTypeInfos.get(1).getTypeName());
     }
-    
+
     pTypeInfos = new ArrayList<TypeInfo>();
     pTypeInfos.add(commonType);
     pTypeInfos.add(commonType);
 
     Method udfMethod = null;
 
-    for(Method m: Arrays.asList(udfClass.getMethods())) {
+    for (Method m : Arrays.asList(udfClass.getMethods())) {
       if (m.getName().equals("evaluate")) {
 
-        List<TypeInfo> argumentTypeInfos = TypeInfoUtils.getParameterTypeInfos(m,
-            pTypeInfos.size());
+        List<TypeInfo> argumentTypeInfos = TypeInfoUtils.getParameterTypeInfos(
+            m, pTypeInfos.size());
         if (argumentTypeInfos == null) {
           // null means the method does not accept number of arguments passed.
           continue;
@@ -113,7 +113,7 @@
 
         boolean match = (argumentTypeInfos.size() == pTypeInfos.size());
 
-        for(int i=0; i<pTypeInfos.size() && match; i++) {
+        for (int i = 0; i < pTypeInfos.size() && match; i++) {
           TypeInfo accepted = argumentTypeInfos.get(i);
           if (!accepted.equals(pTypeInfos.get(i))) {
             match = false;
@@ -123,13 +123,12 @@
         if (match) {
           if (udfMethod != null) {
             throw new AmbiguousMethodException(udfClass, argTypeInfos);
-          }
-          else {
+          } else {
             udfMethod = m;
           }
         }
       }
     }
-    return udfMethod;      
+    return udfMethod;
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java Thu Jan 21 10:37:58 2010
@@ -19,7 +19,8 @@
 package org.apache.hadoop.hive.ql.exec;
 
 /**
- * Base class of numeric UDAFs like sum and avg which need a NumericUDAFEvaluatorResolver.
+ * Base class of numeric UDAFs like sum and avg which need a
+ * NumericUDAFEvaluatorResolver.
  */
 public class NumericUDAF extends UDAF {
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java Thu Jan 21 10:37:58 2010
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.sql.Date;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -26,8 +25,9 @@
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /**
- * Resolver for Numeric UDAFs like sum and avg. If the input argument is string or date,
- * the resolver returns the evaluator whose iterate function operates on doubles.
+ * Resolver for Numeric UDAFs like sum and avg. If the input argument is string
+ * or date, the resolver returns the evaluator whose iterate function operates
+ * on doubles.
  */
 public class NumericUDAFEvaluatorResolver extends DefaultUDAFEvaluatorResolver {
 
@@ -37,16 +37,21 @@
   public NumericUDAFEvaluatorResolver(Class<? extends UDAF> udafClass) {
     super(udafClass);
   }
-  
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hive.ql.exec.UDAFMethodResolver#getEvaluatorClass(java.util.List)
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.hive.ql.exec.UDAFMethodResolver#getEvaluatorClass(java
+   * .util.List)
    */
   @Override
   public Class<? extends UDAFEvaluator> getEvaluatorClass(
       List<TypeInfo> argTypeInfos) throws AmbiguousMethodException {
-    // Go through the argClasses and for any string, void or date time, start looking for doubles
+    // Go through the argClasses and for any string, void or date time, start
+    // looking for doubles
     ArrayList<TypeInfo> args = new ArrayList<TypeInfo>();
-    for(TypeInfo arg: argTypeInfos) {
+    for (TypeInfo arg : argTypeInfos) {
       if (arg.equals(TypeInfoFactory.voidTypeInfo)
           || arg.equals(TypeInfoFactory.stringTypeInfo)) {
         args.add(TypeInfoFactory.doubleTypeInfo);
@@ -54,7 +59,7 @@
         args.add(arg);
       }
     }
-    
+
     return super.getEvaluatorClass(args);
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Jan 21 10:37:58 2010
@@ -45,7 +45,8 @@
 /**
  * Base operator implementation
  **/
-public abstract class Operator <T extends Serializable> implements Serializable, Node {
+public abstract class Operator<T extends Serializable> implements Serializable,
+    Node {
 
   // Bean methods
 
@@ -55,42 +56,44 @@
   protected List<Operator<? extends Serializable>> parentOperators;
   protected String operatorId;
   /**
-   * List of counter names associated with the operator
-   * It contains the following default counters
-   *   NUM_INPUT_ROWS
-   *   NUM_OUTPUT_ROWS
-   *   TIME_TAKEN
+   * List of counter names associated with the operator It contains the
+   * following default counters NUM_INPUT_ROWS NUM_OUTPUT_ROWS TIME_TAKEN
    * Individual operators can add to this list via addToCounterNames methods
    */
   protected ArrayList<String> counterNames;
 
   /**
    * Each operator has its own map of its counter names to disjoint
-   * ProgressCounter - it is populated at compile time and is read in
-   * at run-time while extracting the operator specific counts
+   * ProgressCounter - it is populated at compile time and is read in at
+   * run-time while extracting the operator specific counts
    */
   protected HashMap<String, ProgressCounter> counterNameToEnum;
 
-
   private static int seqId;
 
-  // It can be optimized later so that an operator operator (init/close) is performed
-  // only after that operation has been performed on all the parents. This will require
-  // initializing the whole tree in all the mappers (which might be required for mappers
+  // It can be optimized later so that an operator operator (init/close) is
+  // performed
+  // only after that operation has been performed on all the parents. This will
+  // require
+  // initializing the whole tree in all the mappers (which might be required for
+  // mappers
   // spanning multiple files anyway, in future)
   public static enum State {
-    UNINIT,   // initialize() has not been called
-    INIT,     // initialize() has been called and close() has not been called,
-              // or close() has been called but one of its parent is not closed.
-    CLOSE     // all its parents operators are in state CLOSE and called close()
-              // to children. Note: close() being called and its state being CLOSE is
-              // difference since close() could be called but state is not CLOSE if
-              // one of its parent is not in state CLOSE..
+    UNINIT, // initialize() has not been called
+    INIT, // initialize() has been called and close() has not been called,
+    // or close() has been called but one of its parent is not closed.
+    CLOSE
+    // all its parents operators are in state CLOSE and called close()
+    // to children. Note: close() being called and its state being CLOSE is
+    // difference since close() could be called but state is not CLOSE if
+    // one of its parent is not in state CLOSE..
   };
+
   transient protected State state = State.UNINIT;
 
-  transient static boolean fatalError = false; // fatalError is shared acorss all operators
-  
+  transient static boolean fatalError = false; // fatalError is shared acorss
+                                               // all operators
+
   static {
     seqId = 0;
   }
@@ -102,17 +105,20 @@
   public static void resetId() {
     seqId = 0;
   }
-  
+
   /**
    * Create an operator with a reporter.
-   * @param reporter Used to report progress of certain operators.
+   * 
+   * @param reporter
+   *          Used to report progress of certain operators.
    */
   public Operator(Reporter reporter) {
     this.reporter = reporter;
     id = String.valueOf(seqId++);
   }
 
-  public void setChildOperators(List<Operator<? extends Serializable>> childOperators) {
+  public void setChildOperators(
+      List<Operator<? extends Serializable>> childOperators) {
     this.childOperators = childOperators;
   }
 
@@ -130,14 +136,15 @@
     }
 
     Vector<Node> ret_vec = new Vector<Node>();
-    for(Operator<? extends Serializable> op: getChildOperators()) {
+    for (Operator<? extends Serializable> op : getChildOperators()) {
       ret_vec.add(op);
     }
 
     return ret_vec;
   }
 
-  public void setParentOperators(List<Operator<? extends Serializable>> parentOperators) {
+  public void setParentOperators(
+      List<Operator<? extends Serializable>> parentOperators) {
     this.parentOperators = parentOperators;
   }
 
@@ -178,7 +185,7 @@
 
   // non-bean ..
 
-  transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable> ();
+  transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
   transient protected OutputCollector out;
   transient protected Log LOG = LogFactory.getLog(this.getClass().getName());
   transient protected String alias;
@@ -190,9 +197,9 @@
   transient protected ObjectInspector outputObjInspector;
 
   /**
-   * A map of output column name to input expression map. This is used by optimizer
-   * and built during semantic analysis
-   * contains only key elements for reduce sink and group by op
+   * A map of output column name to input expression map. This is used by
+   * optimizer and built during semantic analysis contains only key elements for
+   * reduce sink and group by op
    */
   protected transient Map<String, exprNodeDesc> colExprMap;
 
@@ -201,21 +208,24 @@
   }
 
   /**
-   * This function is not named getId(), to make sure java serialization
-   * does NOT serialize it.  Some TestParse tests will fail if we serialize
-   * this field, since the Operator ID will change based on the number of
-   * query tests.
+   * This function is not named getId(), to make sure java serialization does
+   * NOT serialize it. Some TestParse tests will fail if we serialize this
+   * field, since the Operator ID will change based on the number of query
+   * tests.
    */
-  public String getIdentifier() { return id; }
+  public String getIdentifier() {
+    return id;
+  }
 
   public void setReporter(Reporter rep) {
     reporter = rep;
 
     // the collector is same across all operators
-    if(childOperators == null)
+    if (childOperators == null) {
       return;
+    }
 
-    for(Operator<? extends Serializable> op: childOperators) {
+    for (Operator<? extends Serializable> op : childOperators) {
       op.setReporter(rep);
     }
   }
@@ -224,10 +234,11 @@
     this.out = out;
 
     // the collector is same across all operators
-    if(childOperators == null)
+    if (childOperators == null) {
       return;
+    }
 
-    for(Operator<? extends Serializable> op: childOperators) {
+    for (Operator<? extends Serializable> op : childOperators) {
       op.setOutputCollector(out);
     }
   }
@@ -238,31 +249,34 @@
   public void setAlias(String alias) {
     this.alias = alias;
 
-    if(childOperators == null)
+    if (childOperators == null) {
       return;
+    }
 
-    for(Operator<? extends Serializable> op: childOperators) {
+    for (Operator<? extends Serializable> op : childOperators) {
       op.setAlias(alias);
     }
   }
 
   public Map<Enum<?>, Long> getStats() {
-    HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long> ();
-    for(Enum<?> one: statsMap.keySet()) {
+    HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long>();
+    for (Enum<?> one : statsMap.keySet()) {
       ret.put(one, Long.valueOf(statsMap.get(one).get()));
     }
-    return(ret);
+    return (ret);
   }
 
   /**
    * checks whether all parent operators are initialized or not
-   * @return true if there are no parents or all parents are initialized. false otherwise
+   * 
+   * @return true if there are no parents or all parents are initialized. false
+   *         otherwise
    */
   protected boolean areAllParentsInitialized() {
     if (parentOperators == null) {
       return true;
     }
-    for(Operator<? extends Serializable> parent: parentOperators) {
+    for (Operator<? extends Serializable> parent : parentOperators) {
       if (parent.state != State.INIT) {
         return false;
       }
@@ -271,46 +285,51 @@
   }
 
   /**
-   * Initializes operators only if all parents have been initialized.
-   * Calls operator specific initializer which then initializes child ops.
-   *
+   * Initializes operators only if all parents have been initialized. Calls
+   * operator specific initializer which then initializes child ops.
+   * 
    * @param hconf
-   * @param inputOIs input object inspector array indexes by tag id. null value is ignored.
+   * @param inputOIs
+   *          input object inspector array indexes by tag id. null value is
+   *          ignored.
    * @throws HiveException
    */
-  public void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException {
+  public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
+      throws HiveException {
     if (state == State.INIT) {
       return;
     }
 
-    if(!areAllParentsInitialized()) {
+    if (!areAllParentsInitialized()) {
       return;
     }
-    
+
     LOG.info("Initializing Self " + id + " " + getName());
 
     if (inputOIs != null) {
       inputObjInspectors = inputOIs;
     }
 
-    // initialize structure to maintain child op info. operator tree changes while
+    // initialize structure to maintain child op info. operator tree changes
+    // while
     // initializing so this need to be done here instead of initialize() method
     if (childOperators != null) {
       childOperatorsArray = new Operator[childOperators.size()];
-      for (int i=0; i<childOperatorsArray.length; i++) {
+      for (int i = 0; i < childOperatorsArray.length; i++) {
         childOperatorsArray[i] = childOperators.get(i);
       }
       childOperatorsTag = new int[childOperatorsArray.length];
-      for (int i=0; i<childOperatorsArray.length; i++) {
-        List<Operator<? extends Serializable>> parentOperators =
-          childOperatorsArray[i].getParentOperators();
+      for (int i = 0; i < childOperatorsArray.length; i++) {
+        List<Operator<? extends Serializable>> parentOperators = childOperatorsArray[i]
+            .getParentOperators();
         if (parentOperators == null) {
           throw new HiveException("Hive internal error: parent is null in "
               + childOperatorsArray[i].getClass() + "!");
         }
         childOperatorsTag[i] = parentOperators.indexOf(this);
         if (childOperatorsTag[i] == -1) {
-          throw new HiveException("Hive internal error: cannot find parent in the child operator!");
+          throw new HiveException(
+              "Hive internal error: cannot find parent in the child operator!");
         }
       }
     }
@@ -333,7 +352,8 @@
   }
 
   /**
-   * Calls initialize on each of the children with outputObjetInspector as the output row format
+   * Calls initialize on each of the children with outputObjetInspector as the
+   * output row format
    */
   protected void initializeChildren(Configuration hconf) throws HiveException {
     state = State.INIT;
@@ -343,45 +363,59 @@
     }
     LOG.info("Initializing children of " + id + " " + getName());
     for (int i = 0; i < childOperatorsArray.length; i++) {
-      childOperatorsArray[i].initialize(hconf, outputObjInspector, childOperatorsTag[i]);
-      if ( reporter != null ) {
+      childOperatorsArray[i].initialize(hconf, outputObjInspector,
+          childOperatorsTag[i]);
+      if (reporter != null) {
         childOperatorsArray[i].setReporter(reporter);
       }
     }
   }
 
   /**
-   * Collects all the parent's output object inspectors and calls actual initialization method
+   * Collects all the parent's output object inspectors and calls actual
+   * initialization method
+   * 
    * @param hconf
-   * @param inputOI OI of the row that this parent will pass to this op
-   * @param parentId parent operator id
+   * @param inputOI
+   *          OI of the row that this parent will pass to this op
+   * @param parentId
+   *          parent operator id
    * @throws HiveException
    */
-  private void initialize(Configuration hconf, ObjectInspector inputOI, int parentId) throws HiveException {
+  private void initialize(Configuration hconf, ObjectInspector inputOI,
+      int parentId) throws HiveException {
     LOG.info("Initializing child " + id + " " + getName());
     inputObjInspectors[parentId] = inputOI;
     // call the actual operator initialization function
     initialize(hconf, null);
   }
 
-
-   /**
+  /**
    * Process the row.
-   * @param row  The object representing the row.
-   * @param tag  The tag of the row usually means which parent this row comes from.
-   *             Rows with the same tag should have exactly the same rowInspector all the time.
+   * 
+   * @param row
+   *          The object representing the row.
+   * @param tag
+   *          The tag of the row usually means which parent this row comes from.
+   *          Rows with the same tag should have exactly the same rowInspector
+   *          all the time.
    */
   public abstract void processOp(Object row, int tag) throws HiveException;
 
   /**
    * Process the row.
-   * @param row  The object representing the row.
-   * @param tag  The tag of the row usually means which parent this row comes from.
-   *             Rows with the same tag should have exactly the same rowInspector all the time.
+   * 
+   * @param row
+   *          The object representing the row.
+   * @param tag
+   *          The tag of the row usually means which parent this row comes from.
+   *          Rows with the same tag should have exactly the same rowInspector
+   *          all the time.
    */
   public void process(Object row, int tag) throws HiveException {
-    if ( fatalError ) 
+    if (fatalError) {
       return;
+    }
     preProcessCounter();
     processOp(row, tag);
     postProcessCounter();
@@ -391,15 +425,18 @@
   public void startGroup() throws HiveException {
     LOG.debug("Starting group");
 
-    if (childOperators == null)
+    if (childOperators == null) {
       return;
-    
-    if ( fatalError )
+    }
+
+    if (fatalError) {
       return;
+    }
 
     LOG.debug("Starting group for children:");
-    for (Operator<? extends Serializable> op: childOperators)
+    for (Operator<? extends Serializable> op : childOperators) {
       op.startGroup();
+    }
 
     LOG.debug("Start group Done");
   }
@@ -408,24 +445,26 @@
   public void endGroup() throws HiveException {
     LOG.debug("Ending group");
 
-    if (childOperators == null)
+    if (childOperators == null) {
       return;
+    }
 
-    if ( fatalError )
+    if (fatalError) {
       return;
+    }
 
     LOG.debug("Ending group for children:");
-    for (Operator<? extends Serializable> op: childOperators)
+    for (Operator<? extends Serializable> op : childOperators) {
       op.endGroup();
+    }
 
     LOG.debug("End group Done");
   }
 
   private boolean allInitializedParentsAreClosed() {
     if (parentOperators != null) {
-      for(Operator<? extends Serializable> parent: parentOperators) {
-        if (!(parent.state == State.CLOSE ||
-              parent.state == State.UNINIT)) {
+      for (Operator<? extends Serializable> parent : parentOperators) {
+        if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) {
           return false;
         }
       }
@@ -438,12 +477,14 @@
   // more than 1 thread should call this close() function.
   public void close(boolean abort) throws HiveException {
 
-    if (state == State.CLOSE)
+    if (state == State.CLOSE) {
       return;
+    }
 
     // check if all parents are finished
-    if (!allInitializedParentsAreClosed())
+    if (!allInitializedParentsAreClosed()) {
       return;
+    }
 
     // set state as CLOSE as long as all parents are closed
     // state == CLOSE doesn't mean all children are also in state CLOSE
@@ -463,10 +504,11 @@
 
     try {
       logStats();
-      if(childOperators == null)
+      if (childOperators == null) {
         return;
+      }
 
-      for(Operator<? extends Serializable> op: childOperators) {
+      for (Operator<? extends Serializable> op : childOperators) {
         op.close(abort);
       }
 
@@ -478,33 +520,35 @@
   }
 
   /**
-   * Operator specific close routine. Operators which inherents this
-   * class should overwrite this funtion for their specific cleanup
-   * routine.
+   * Operator specific close routine. Operators which inherents this class
+   * should overwrite this funtion for their specific cleanup routine.
    */
   protected void closeOp(boolean abort) throws HiveException {
   }
 
-
   /**
    * Unlike other operator interfaces which are called from map or reduce task,
    * jobClose is called from the jobclient side once the job has completed
-   *
-   * @param conf Configuration with with which job was submitted
-   * @param success whether the job was completed successfully or not
+   * 
+   * @param conf
+   *          Configuration with with which job was submitted
+   * @param success
+   *          whether the job was completed successfully or not
    */
-  public void jobClose(Configuration conf, boolean success) throws HiveException {
-    if(childOperators == null)
+  public void jobClose(Configuration conf, boolean success)
+      throws HiveException {
+    if (childOperators == null) {
       return;
+    }
 
-    for(Operator<? extends Serializable> op: childOperators) {
+    for (Operator<? extends Serializable> op : childOperators) {
       op.jobClose(conf, success);
     }
   }
 
   /**
-   *  Cache childOperators in an array for faster access. childOperatorsArray is accessed
-   *  per row, so it's important to make the access efficient.
+   * Cache childOperators in an array for faster access. childOperatorsArray is
+   * accessed per row, so it's important to make the access efficient.
    */
   transient protected Operator<? extends Serializable>[] childOperatorsArray = null;
   transient protected int[] childOperatorsTag;
@@ -513,54 +557,69 @@
   transient private long cntr = 0;
   transient private long nextCntr = 1;
 
-   /**
-   * Replace one child with another at the same position. The parent of the child is not changed
-   * @param child     the old child
-   * @param newChild  the new child
+  /**
+   * Replace one child with another at the same position. The parent of the
+   * child is not changed
+   * 
+   * @param child
+   *          the old child
+   * @param newChild
+   *          the new child
    */
-  public void  replaceChild(Operator<? extends Serializable> child, Operator<? extends Serializable> newChild) {
+  public void replaceChild(Operator<? extends Serializable> child,
+      Operator<? extends Serializable> newChild) {
     int childIndex = childOperators.indexOf(child);
     assert childIndex != -1;
     childOperators.set(childIndex, newChild);
   }
 
-  public void  removeChild(Operator<? extends Serializable> child) {
+  public void removeChild(Operator<? extends Serializable> child) {
     int childIndex = childOperators.indexOf(child);
     assert childIndex != -1;
-    if (childOperators.size() == 1)
+    if (childOperators.size() == 1) {
       childOperators = null;
-    else
+    } else {
       childOperators.remove(childIndex);
+    }
 
     int parentIndex = child.getParentOperators().indexOf(this);
     assert parentIndex != -1;
-    if (child.getParentOperators().size() == 1)
+    if (child.getParentOperators().size() == 1) {
       child.setParentOperators(null);
-    else
+    } else {
       child.getParentOperators().remove(parentIndex);
+    }
   }
 
   /**
-   * Replace one parent with another at the same position. Chilren of the new parent are not updated
-   * @param parent     the old parent
-   * @param newParent  the new parent
+   * Replace one parent with another at the same position. Chilren of the new
+   * parent are not updated
+   * 
+   * @param parent
+   *          the old parent
+   * @param newParent
+   *          the new parent
    */
-  public void  replaceParent(Operator<? extends Serializable> parent, Operator<? extends Serializable> newParent) {
+  public void replaceParent(Operator<? extends Serializable> parent,
+      Operator<? extends Serializable> newParent) {
     int parentIndex = parentOperators.indexOf(parent);
     assert parentIndex != -1;
     parentOperators.set(parentIndex, newParent);
   }
 
   private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by an operator. It dumps
+    // A very simple counter to keep track of number of rows processed by an
+    // operator. It dumps
     // every 1 million times, and quickly before that
-    if (cntr >= 1000000)
+    if (cntr >= 1000000) {
       return cntr + 1000000;
+    }
 
     return 10 * cntr;
   }
 
-  protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+  protected void forward(Object row, ObjectInspector rowInspector)
+      throws HiveException {
 
     if ((++outputRows % 1000) == 0) {
       if (counterNameToEnum != null) {
@@ -578,14 +637,17 @@
     }
 
     // For debugging purposes:
-    // System.out.println("" + this.getClass() + ": " + SerDeUtils.getJSONString(row, rowInspector));
-    // System.out.println("" + this.getClass() + ">> " + ObjectInspectorUtils.getObjectInspectorName(rowInspector));
+    // System.out.println("" + this.getClass() + ": " +
+    // SerDeUtils.getJSONString(row, rowInspector));
+    // System.out.println("" + this.getClass() + ">> " +
+    // ObjectInspectorUtils.getObjectInspectorName(rowInspector));
 
     if (childOperatorsArray == null && childOperators != null) {
-      throw new HiveException("Internal Hive error during operator initialization.");
+      throw new HiveException(
+          "Internal Hive error during operator initialization.");
     }
 
-    if((childOperatorsArray == null) || (getDone())) {
+    if ((childOperatorsArray == null) || (getDone())) {
       return;
     }
 
@@ -593,7 +655,7 @@
     for (int i = 0; i < childOperatorsArray.length; i++) {
       Operator<? extends Serializable> o = childOperatorsArray[i];
       if (o.getDone()) {
-        childrenDone ++;
+        childrenDone++;
       } else {
         o.process(row, childOperatorsTag[i]);
       }
@@ -606,7 +668,7 @@
   }
 
   public void resetStats() {
-    for(Enum<?> e: statsMap.keySet()) {
+    for (Enum<?> e : statsMap.keySet()) {
       statsMap.get(e).set(0L);
     }
   }
@@ -615,23 +677,24 @@
     public void func(Operator<? extends Serializable> op);
   }
 
-  public void preorderMap (OperatorFunc opFunc) {
+  public void preorderMap(OperatorFunc opFunc) {
     opFunc.func(this);
-    if(childOperators != null) {
-      for(Operator<? extends Serializable> o: childOperators) {
+    if (childOperators != null) {
+      for (Operator<? extends Serializable> o : childOperators) {
         o.preorderMap(opFunc);
       }
     }
   }
 
-  public void logStats () {
-    for(Enum<?> e: statsMap.keySet()) {
+  public void logStats() {
+    for (Enum<?> e : statsMap.keySet()) {
       LOG.info(e.toString() + ":" + statsMap.get(e).toString());
     }
   }
 
   /**
    * Implements the getName function for the Node Interface.
+   * 
    * @return the name of the operator
    */
   public String getName() {
@@ -639,8 +702,9 @@
   }
 
   /**
-   * Returns a map of output column name to input expression map
-   * Note that currently it returns only key columns for ReduceSink and GroupBy operators
+   * Returns a map of output column name to input expression map Note that
+   * currently it returns only key columns for ReduceSink and GroupBy operators
+   * 
    * @return null if the operator doesn't change columns
    */
   public Map<String, exprNodeDesc> getColumnExprMap() {
@@ -657,7 +721,7 @@
     }
     StringBuilder s = new StringBuilder();
     s.append("\n");
-    while(level > 0) {
+    while (level > 0) {
       s.append("  ");
       level--;
     }
@@ -669,8 +733,9 @@
   }
 
   public String dump(int level, HashSet<Integer> seenOpts) {
-    if ( seenOpts.contains(new Integer(id)))
+    if (seenOpts.contains(new Integer(id))) {
       return null;
+    }
     seenOpts.add(new Integer(id));
 
     StringBuilder s = new StringBuilder();
@@ -683,7 +748,7 @@
       s.append(ls);
       s.append("  <Children>");
       for (Operator<? extends Serializable> o : childOperators) {
-        s.append(o.dump(level+2, seenOpts));
+        s.append(o.dump(level + 2, seenOpts));
       }
       s.append(ls);
       s.append("  <\\Children>");
@@ -694,7 +759,7 @@
       s.append("  <Parent>");
       for (Operator<? extends Serializable> o : parentOperators) {
         s.append("Id = " + o.id + " ");
-        s.append(o.dump(level,seenOpts));
+        s.append(o.dump(level, seenOpts));
       }
       s.append("<\\Parent>");
     }
@@ -711,7 +776,7 @@
   protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] result = new ObjectInspector[evals.length];
-    for (int i=0; i<evals.length; i++) {
+    for (int i = 0; i < evals.length; i++) {
       result[i] = evals[i].initialize(rowInspector);
     }
     return result;
@@ -722,12 +787,12 @@
    * StructObjectInspector with integer field names.
    */
   protected static StructObjectInspector initEvaluatorsAndReturnStruct(
-      ExprNodeEvaluator[] evals, List<String> outputColName, ObjectInspector rowInspector)
-      throws HiveException {
-    ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, rowInspector);
+      ExprNodeEvaluator[] evals, List<String> outputColName,
+      ObjectInspector rowInspector) throws HiveException {
+    ObjectInspector[] fieldObjectInspectors = initEvaluators(evals,
+        rowInspector);
     return ObjectInspectorFactory.getStandardStructObjectInspector(
-        outputColName,
-        Arrays.asList(fieldObjectInspectors));
+        outputColName, Arrays.asList(fieldObjectInspectors));
   }
 
   /**
@@ -738,46 +803,7 @@
    * TODO This is a hack for hadoop 0.17 which only supports enum counters
    */
   public static enum ProgressCounter {
-    C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, 
-    C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, 
-    C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, 
-    C31, C32, C33, C34, C35, C36, C37, C38, C39, C40, 
-    C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, 
-    C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, 
-    C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, 
-    C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
-    C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, 
-    C91, C92, C93, C94, C95, C96, C97, C98, C99, C100, 
-    C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, 
-    C111, C112, C113, C114, C115, C116, C117, C118, C119, C120, 
-    C121, C122, C123, C124, C125, C126, C127, C128, C129, C130,
-    C131, C132, C133, C134, C135, C136, C137, C138, C139, C140,
-    C141, C142, C143, C144, C145, C146, C147, C148, C149, C150,
-    C151, C152, C153, C154, C155, C156, C157, C158, C159, C160,
-    C161, C162, C163, C164, C165, C166, C167, C168, C169, C170,
-    C171, C172, C173, C174, C175, C176, C177, C178, C179, C180,
-    C181, C182, C183, C184, C185, C186, C187, C188, C189, C190,
-    C191, C192, C193, C194, C195, C196, C197, C198, C199, C200,
-    C201, C202, C203, C204, C205, C206, C207, C208, C209, C210,
-    C211, C212, C213, C214, C215, C216, C217, C218, C219, C220,
-    C221, C222, C223, C224, C225, C226, C227, C228, C229, C230,
-    C231, C232, C233, C234, C235, C236, C237, C238, C239, C240,
-    C241, C242, C243, C244, C245, C246, C247, C248, C249, C250,
-    C251, C252, C253, C254, C255, C256, C257, C258, C259, C260,
-    C261, C262, C263, C264, C265, C266, C267, C268, C269, C270,
-    C271, C272, C273, C274, C275, C276, C277, C278, C279, C280,
-    C281, C282, C283, C284, C285, C286, C287, C288, C289, C290,
-    C291, C292, C293, C294, C295, C296, C297, C298, C299, C300,
-    C301, C302, C303, C304, C305, C306, C307, C308, C309, C310,
-    C311, C312, C313, C314, C315, C316, C317, C318, C319, C320,
-    C321, C322, C323, C324, C325, C326, C327, C328, C329, C330,
-    C331, C332, C333, C334, C335, C336, C337, C338, C339, C340,
-    C341, C342, C343, C344, C345, C346, C347, C348, C349, C350,
-    C351, C352, C353, C354, C355, C356, C357, C358, C359, C360,
-    C361, C362, C363, C364, C365, C366, C367, C368, C369, C370,
-    C371, C372, C373, C374, C375, C376, C377, C378, C379, C380,
-    C381, C382, C383, C384, C385, C386, C387, C388, C389, C390,
-    C391, C392, C393, C394, C395, C396, C397, C398, C399, C400
+    C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80, C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96, C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112, C113, C114, C115, C116, C117, C118, C119, C120, C121, C122, C123, C124, C125, C126, C127, C128, C129, C130, C131, C132, C133, C134, C135, C136, C137, C138, C139, C140, C141, C142, C143, C144, C145, C146, C147, C148, C149, C150, C151, C152, C153, C154, C155, C156, C157, C158, C159, C160, C161, C162, C163, C164, C165, C166, C167, C168, C169, C170, C171, C172, C173, C174, C175, C176, C177, C178, C179, C180, C181, C182, C
 183, C184, C185, C186, C187, C188, C189, C190, C191, C192, C193, C194, C195, C196, C197, C198, C199, C200, C201, C202, C203, C204, C205, C206, C207, C208, C209, C210, C211, C212, C213, C214, C215, C216, C217, C218, C219, C220, C221, C222, C223, C224, C225, C226, C227, C228, C229, C230, C231, C232, C233, C234, C235, C236, C237, C238, C239, C240, C241, C242, C243, C244, C245, C246, C247, C248, C249, C250, C251, C252, C253, C254, C255, C256, C257, C258, C259, C260, C261, C262, C263, C264, C265, C266, C267, C268, C269, C270, C271, C272, C273, C274, C275, C276, C277, C278, C279, C280, C281, C282, C283, C284, C285, C286, C287, C288, C289, C290, C291, C292, C293, C294, C295, C296, C297, C298, C299, C300, C301, C302, C303, C304, C305, C306, C307, C308, C309, C310, C311, C312, C313, C314, C315, C316, C317, C318, C319, C320, C321, C322, C323, C324, C325, C326, C327, C328, C329, C330, C331, C332, C333, C334, C335, C336, C337, C338, C339, C340, C341, C342, C343, C344, C345, C346, C347, 
 C348, C349, C350, C351, C352, C353, C354, C355, C356, C357, C358, C359, C360, C361, C362, C363, C364, C365, C366, C367, C368, C369, C370, C371, C372, C373, C374, C375, C376, C377, C378, C379, C380, C381, C382, C383, C384, C385, C386, C387, C388, C389, C390, C391, C392, C393, C394, C395, C396, C397, C398, C399, C400
   };
 
   private static int totalNumCntrs = 400;
@@ -788,9 +814,8 @@
   transient protected Map<String, Long> counters;
 
   /**
-   * keeps track of unique ProgressCounter enums used
-   * this value is used at compile time while assigning ProgressCounter
-   * enums to counter names
+   * keeps track of unique ProgressCounter enums used this value is used at
+   * compile time while assigning ProgressCounter enums to counter names
    */
   private static int lastEnumUsed;
 
@@ -804,15 +829,14 @@
   /**
    * this is called before operator process to buffer some counters
    */
-  private void preProcessCounter()
-  {
+  private void preProcessCounter() {
     inputRows++;
 
     if (counterNameToEnum != null) {
       if ((inputRows % 1000) == 0) {
         incrCounter(numInputRowsCntr, inputRows);
         incrCounter(timeTakenCntr, totalTime);
-        inputRows = 0 ;
+        inputRows = 0;
         totalTime = 0;
       }
       beginTime = System.currentTimeMillis();
@@ -822,28 +846,31 @@
   /**
    * this is called after operator process to buffer some counters
    */
-  private void postProcessCounter()
-  {
-    if (counterNameToEnum != null)
+  private void postProcessCounter() {
+    if (counterNameToEnum != null) {
       totalTime += (System.currentTimeMillis() - beginTime);
+    }
   }
 
-
   /**
    * this is called in operators in map or reduce tasks
+   * 
    * @param name
    * @param amount
    */
-  protected void incrCounter(String name, long amount)
-  {
+  protected void incrCounter(String name, long amount) {
     String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name;
     ProgressCounter pc = counterNameToEnum.get(counterName);
 
-    // Currently, we maintain fixed number of counters per plan - in case of a bigger tree, we may run out of them
-    if (pc == null)
-      LOG.warn("Using too many counters. Increase the total number of counters for " + counterName);
-    else if (reporter != null)
+    // Currently, we maintain fixed number of counters per plan - in case of a
+    // bigger tree, we may run out of them
+    if (pc == null) {
+      LOG
+          .warn("Using too many counters. Increase the total number of counters for "
+              + counterName);
+    } else if (reporter != null) {
       reporter.incrCounter(pc, amount);
+    }
   }
 
   public ArrayList<String> getCounterNames() {
@@ -872,7 +899,9 @@
 
   /**
    * called in ExecDriver.progress periodically
-   * @param ctrs counters from the running job
+   * 
+   * @param ctrs
+   *          counters from the running job
    */
   @SuppressWarnings("unchecked")
   public void updateCounters(Counters ctrs) {
@@ -880,85 +909,101 @@
       counters = new HashMap<String, Long>();
     }
 
-    // For some old unit tests, the counters will not be populated. Eventually, the old tests should be removed
-    if (counterNameToEnum == null)
+    // For some old unit tests, the counters will not be populated. Eventually,
+    // the old tests should be removed
+    if (counterNameToEnum == null) {
       return;
+    }
 
-    for (Map.Entry<String, ProgressCounter> counter: counterNameToEnum.entrySet()) {
+    for (Map.Entry<String, ProgressCounter> counter : counterNameToEnum
+        .entrySet()) {
       counters.put(counter.getKey(), ctrs.getCounter(counter.getValue()));
     }
     // update counters of child operators
     // this wont be an infinite loop since the operator graph is acyclic
     // but, some operators may be updated more than once and that's ok
     if (getChildren() != null) {
-      for (Node op: getChildren()) {
-        ((Operator<? extends Serializable>)op).updateCounters(ctrs);
+      for (Node op : getChildren()) {
+        ((Operator<? extends Serializable>) op).updateCounters(ctrs);
       }
     }
   }
 
   /**
-   * Recursively check this operator and its descendants to see if the
-   * fatal error counter is set to non-zero.
+   * Recursively check this operator and its descendants to see if the fatal
+   * error counter is set to non-zero.
+   * 
    * @param ctrs
    */
   public boolean checkFatalErrors(Counters ctrs, StringBuffer errMsg) {
-    if ( counterNameToEnum == null )
+    if (counterNameToEnum == null) {
       return false;
-    
+    }
+
     String counterName = "CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr;
     ProgressCounter pc = counterNameToEnum.get(counterName);
 
-    // Currently, we maintain fixed number of counters per plan - in case of a bigger tree, we may run out of them
-    if (pc == null)
-      LOG.warn("Using too many counters. Increase the total number of counters for " + counterName);
-    else {
+    // Currently, we maintain fixed number of counters per plan - in case of a
+    // bigger tree, we may run out of them
+    if (pc == null) {
+      LOG
+          .warn("Using too many counters. Increase the total number of counters for "
+              + counterName);
+    } else {
       long value = ctrs.getCounter(pc);
       fatalErrorMessage(errMsg, value);
-      if ( value != 0 ) 
+      if (value != 0) {
         return true;
+      }
     }
-    
+
     if (getChildren() != null) {
-      for (Node op: getChildren()) {
-        if (((Operator<? extends Serializable>)op).checkFatalErrors(ctrs, errMsg)) {
+      for (Node op : getChildren()) {
+        if (((Operator<? extends Serializable>) op).checkFatalErrors(ctrs,
+            errMsg)) {
           return true;
         }
       }
     }
     return false;
   }
-  
-  /** 
+
+  /**
    * Get the fatal error message based on counter's code.
-   * @param errMsg error message should be appended to this output parameter.
-   * @param counterValue input counter code.
+   * 
+   * @param errMsg
+   *          error message should be appended to this output parameter.
+   * @param counterValue
+   *          input counter code.
    */
   protected void fatalErrorMessage(StringBuffer errMsg, long counterValue) {
   }
-  
+
   // A given query can have multiple map-reduce jobs
   public static void resetLastEnumUsed() {
     lastEnumUsed = 0;
   }
 
   /**
-   * Called only in SemanticAnalyzer after all operators have added their
-   * own set of counter names
+   * Called only in SemanticAnalyzer after all operators have added their own
+   * set of counter names
    */
   public void assignCounterNameToEnum() {
     if (counterNameToEnum != null) {
       return;
     }
     counterNameToEnum = new HashMap<String, ProgressCounter>();
-    for (String counterName: getCounterNames()) {
+    for (String counterName : getCounterNames()) {
       ++lastEnumUsed;
 
       // TODO Hack for hadoop-0.17
-      // Currently, only maximum number of 'totalNumCntrs' can be used. If you want
-      // to add more counters, increase the number of counters in ProgressCounter
+      // Currently, only maximum number of 'totalNumCntrs' can be used. If you
+      // want
+      // to add more counters, increase the number of counters in
+      // ProgressCounter
       if (lastEnumUsed > totalNumCntrs) {
-        LOG.warn("Using too many counters. Increase the total number of counters");
+        LOG
+            .warn("Using too many counters. Increase the total number of counters");
         return;
       }
       String enumName = "C" + lastEnumUsed;
@@ -967,10 +1012,10 @@
     }
   }
 
-  protected static String numInputRowsCntr  = "NUM_INPUT_ROWS";
+  protected static String numInputRowsCntr = "NUM_INPUT_ROWS";
   protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
-  protected static String timeTakenCntr     = "TIME_TAKEN";
-  protected static String fatalErrorCntr    = "FATAL_ERROR";
+  protected static String timeTakenCntr = "TIME_TAKEN";
+  protected static String fatalErrorCntr = "FATAL_ERROR";
 
   public void initializeCounters() {
     initOperatorId();
@@ -986,32 +1031,32 @@
   }
 
   /*
-   * By default, the list is empty - if an operator wants to add more counters, it should override this method
-   * and provide the new list.
-
+   * By default, the list is empty - if an operator wants to add more counters,
+   * it should override this method and provide the new list.
    */
   private List<String> getAdditionalCounters() {
     return null;
   }
- 
+
   public HashMap<String, ProgressCounter> getCounterNameToEnum() {
     return counterNameToEnum;
   }
 
-  public void setCounterNameToEnum(HashMap<String, ProgressCounter> counterNameToEnum) {
+  public void setCounterNameToEnum(
+      HashMap<String, ProgressCounter> counterNameToEnum) {
     this.counterNameToEnum = counterNameToEnum;
   }
 
-   /**
-   * Should be overridden to return the type of the specific operator among
-    * the types in OperatorType
-    *
-    * @return OperatorType.* or -1 if not overridden
-    */
-   public int getType() {
-     assert false;
-     return -1;
-   }
+  /**
+   * Should be overridden to return the type of the specific operator among the
+   * types in OperatorType
+   * 
+   * @return OperatorType.* or -1 if not overridden
+   */
+  public int getType() {
+    assert false;
+    return -1;
+  }
 
   public void setGroupKeyObject(Object keyObject) {
     this.groupKeyObject = keyObject;