You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [8/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Thu Jan 21 10:37:58 2010
@@ -18,15 +18,29 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.*;
-import java.io.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.plan.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.plan.collectDesc;
+import org.apache.hadoop.hive.ql.plan.extractDesc;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.filterDesc;
+import org.apache.hadoop.hive.ql.plan.forwardDesc;
+import org.apache.hadoop.hive.ql.plan.groupByDesc;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc;
+import org.apache.hadoop.hive.ql.plan.limitDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.scriptDesc;
+import org.apache.hadoop.hive.ql.plan.selectDesc;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
+import org.apache.hadoop.hive.ql.plan.udtfDesc;
+import org.apache.hadoop.hive.ql.plan.unionDesc;
 
 public class OperatorFactory {
-  
+
   public final static class opTuple<T extends Serializable> {
     public Class<T> descClass;
     public Class<? extends Operator<T>> opClass;
@@ -39,32 +53,40 @@
 
   public static ArrayList<opTuple> opvec;
   static {
-    opvec = new ArrayList<opTuple> ();
-    opvec.add(new opTuple<filterDesc> (filterDesc.class, FilterOperator.class));
-    opvec.add(new opTuple<selectDesc> (selectDesc.class, SelectOperator.class));
-    opvec.add(new opTuple<forwardDesc> (forwardDesc.class, ForwardOperator.class));
-    opvec.add(new opTuple<fileSinkDesc> (fileSinkDesc.class, FileSinkOperator.class));
-    opvec.add(new opTuple<collectDesc> (collectDesc.class, CollectOperator.class));
-    opvec.add(new opTuple<scriptDesc> (scriptDesc.class, ScriptOperator.class));
-    opvec.add(new opTuple<reduceSinkDesc> (reduceSinkDesc.class, ReduceSinkOperator.class));
-    opvec.add(new opTuple<extractDesc> (extractDesc.class, ExtractOperator.class));
-    opvec.add(new opTuple<groupByDesc> (groupByDesc.class, GroupByOperator.class));
-    opvec.add(new opTuple<joinDesc> (joinDesc.class, JoinOperator.class));
-    opvec.add(new opTuple<mapJoinDesc> (mapJoinDesc.class, MapJoinOperator.class));
-    opvec.add(new opTuple<limitDesc> (limitDesc.class, LimitOperator.class));
-    opvec.add(new opTuple<tableScanDesc> (tableScanDesc.class, TableScanOperator.class));
-    opvec.add(new opTuple<unionDesc> (unionDesc.class, UnionOperator.class));
-    opvec.add(new opTuple<udtfDesc> (udtfDesc.class, UDTFOperator.class));
-    opvec.add(new opTuple<lateralViewJoinDesc>(lateralViewJoinDesc.class, LateralViewJoinOperator.class));
+    opvec = new ArrayList<opTuple>();
+    opvec.add(new opTuple<filterDesc>(filterDesc.class, FilterOperator.class));
+    opvec.add(new opTuple<selectDesc>(selectDesc.class, SelectOperator.class));
+    opvec
+        .add(new opTuple<forwardDesc>(forwardDesc.class, ForwardOperator.class));
+    opvec.add(new opTuple<fileSinkDesc>(fileSinkDesc.class,
+        FileSinkOperator.class));
+    opvec
+        .add(new opTuple<collectDesc>(collectDesc.class, CollectOperator.class));
+    opvec.add(new opTuple<scriptDesc>(scriptDesc.class, ScriptOperator.class));
+    opvec.add(new opTuple<reduceSinkDesc>(reduceSinkDesc.class,
+        ReduceSinkOperator.class));
+    opvec
+        .add(new opTuple<extractDesc>(extractDesc.class, ExtractOperator.class));
+    opvec
+        .add(new opTuple<groupByDesc>(groupByDesc.class, GroupByOperator.class));
+    opvec.add(new opTuple<joinDesc>(joinDesc.class, JoinOperator.class));
+    opvec
+        .add(new opTuple<mapJoinDesc>(mapJoinDesc.class, MapJoinOperator.class));
+    opvec.add(new opTuple<limitDesc>(limitDesc.class, LimitOperator.class));
+    opvec.add(new opTuple<tableScanDesc>(tableScanDesc.class,
+        TableScanOperator.class));
+    opvec.add(new opTuple<unionDesc>(unionDesc.class, UnionOperator.class));
+    opvec.add(new opTuple<udtfDesc>(udtfDesc.class, UDTFOperator.class));
+    opvec.add(new opTuple<lateralViewJoinDesc>(lateralViewJoinDesc.class,
+        LateralViewJoinOperator.class));
   }
-              
 
   public static <T extends Serializable> Operator<T> get(Class<T> opClass) {
-      
-    for(opTuple o: opvec) {
-      if(o.descClass == opClass) {
+
+    for (opTuple o : opvec) {
+      if (o.descClass == opClass) {
         try {
-          Operator<T> op = (Operator<T>)o.opClass.newInstance();
+          Operator<T> op = (Operator<T>) o.opClass.newInstance();
           op.initializeCounters();
           return op;
         } catch (Exception e) {
@@ -73,33 +95,37 @@
         }
       }
     }
-    throw new RuntimeException ("No operator for descriptor class " + opClass.getName());
+    throw new RuntimeException("No operator for descriptor class "
+        + opClass.getName());
   }
 
-  public static <T extends Serializable> Operator<T> get(Class<T> opClass, RowSchema rwsch) {
-    
+  public static <T extends Serializable> Operator<T> get(Class<T> opClass,
+      RowSchema rwsch) {
+
     Operator<T> ret = get(opClass);
     ret.setSchema(rwsch);
     return ret;
   }
 
   /**
-   * Returns an operator given the conf and a list of children operators.  
+   * Returns an operator given the conf and a list of children operators.
    */
-  public static <T extends Serializable> Operator<T> get(T conf, Operator<? extends Serializable> ... oplist) {
-    Operator<T> ret = get((Class <T>)conf.getClass());
+  public static <T extends Serializable> Operator<T> get(T conf,
+      Operator<? extends Serializable>... oplist) {
+    Operator<T> ret = get((Class<T>) conf.getClass());
     ret.setConf(conf);
-    if(oplist.length == 0)
+    if (oplist.length == 0) {
       return (ret);
+    }
 
-    ArrayList<Operator<? extends Serializable>> clist = new ArrayList<Operator<? extends Serializable>> ();
-    for(Operator op: oplist) {
+    ArrayList<Operator<? extends Serializable>> clist = new ArrayList<Operator<? extends Serializable>>();
+    for (Operator op : oplist) {
       clist.add(op);
     }
     ret.setChildOperators(clist);
-    
+
     // Add this parent to the children
-    for(Operator op: oplist) {
+    for (Operator op : oplist) {
       List<Operator<? extends Serializable>> parents = op.getParentOperators();
       if (parents == null) {
         parents = new ArrayList<Operator<? extends Serializable>>();
@@ -111,25 +137,28 @@
   }
 
   /**
-   * Returns an operator given the conf and a list of children operators.  
+   * Returns an operator given the conf and a list of children operators.
    */
-  public static <T extends Serializable> Operator<T> get(T conf, RowSchema rwsch, Operator ... oplist) {
+  public static <T extends Serializable> Operator<T> get(T conf,
+      RowSchema rwsch, Operator... oplist) {
     Operator<T> ret = get(conf, oplist);
     ret.setSchema(rwsch);
     return (ret);
   }
 
   /**
-   * Returns an operator given the conf and a list of parent operators.  
+   * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf, Operator ... oplist) {
-    Operator<T> ret = get((Class <T>)conf.getClass());
+  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+      Operator... oplist) {
+    Operator<T> ret = get((Class<T>) conf.getClass());
     ret.setConf(conf);
-    if(oplist.length == 0)
+    if (oplist.length == 0) {
       return (ret);
+    }
 
     // Add the new operator as child of each of the passed in operators
-    for(Operator op: oplist) {
+    for (Operator op : oplist) {
       List<Operator> children = op.getChildOperators();
       if (children == null) {
         children = new ArrayList<Operator>();
@@ -140,18 +169,20 @@
 
     // add parents for the newly created operator
     List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
-    for(Operator op: oplist)
+    for (Operator op : oplist) {
       parent.add(op);
-    
+    }
+
     ret.setParentOperators(parent);
 
     return (ret);
   }
 
   /**
-   * Returns an operator given the conf and a list of parent operators.  
+   * Returns an operator given the conf and a list of parent operators.
    */
-  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf, RowSchema rwsch, Operator ... oplist) {
+  public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+      RowSchema rwsch, Operator... oplist) {
     Operator<T> ret = getAndMakeChild(conf, oplist);
     ret.setSchema(rwsch);
     return (ret);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java Thu Jan 21 10:37:58 2010
@@ -25,10 +25,10 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
-
 public interface RecordReader {
 
-  public void initialize(InputStream in, Configuration conf, Properties tbl) throws IOException;
+  public void initialize(InputStream in, Configuration conf, Properties tbl)
+      throws IOException;
 
   public Writable createRow() throws IOException;
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java Thu Jan 21 10:37:58 2010
@@ -24,10 +24,12 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
-
 public interface RecordWriter {
 
-  public void initialize(OutputStream in, Configuration conf) throws IOException;
+  public void initialize(OutputStream in, Configuration conf)
+      throws IOException;
+
   public void write(Writable row) throws IOException;
+
   public void close() throws IOException;
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Jan 21 10:37:58 2010
@@ -42,69 +42,73 @@
 /**
  * Reduce Sink Operator sends output to the reduce stage
  **/
-public class ReduceSinkOperator extends TerminalOperator <reduceSinkDesc> implements Serializable {
+public class ReduceSinkOperator extends TerminalOperator<reduceSinkDesc>
+    implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
   /**
-   * The evaluators for the key columns.
-   * Key columns decide the sort order on the reducer side.
-   * Key columns are passed to the reducer in the "key".
+   * The evaluators for the key columns. Key columns decide the sort order on
+   * the reducer side. Key columns are passed to the reducer in the "key".
    */
   transient protected ExprNodeEvaluator[] keyEval;
   /**
-   * The evaluators for the value columns.
-   * Value columns are passed to reducer in the "value". 
+   * The evaluators for the value columns. Value columns are passed to reducer
+   * in the "value".
    */
   transient protected ExprNodeEvaluator[] valueEval;
   /**
-   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
-   * Partition columns decide the reducer that the current row goes to.
-   * Partition columns are not passed to reducer.
+   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+   * Hive language). Partition columns decide the reducer that the current row
+   * goes to. Partition columns are not passed to reducer.
    */
   transient protected ExprNodeEvaluator[] partitionEval;
-  
-  // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready
+
+  // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
+  // ready
   transient Serializer keySerializer;
   transient boolean keyIsText;
   transient Serializer valueSerializer;
   transient int tag;
   transient byte[] tagByte = new byte[1];
-  
+
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
     try {
       keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
-      int i=0;
-      for(exprNodeDesc e: conf.getKeyCols()) {
+      int i = 0;
+      for (exprNodeDesc e : conf.getKeyCols()) {
         keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
       }
 
       valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
-      i=0;
-      for(exprNodeDesc e: conf.getValueCols()) {
+      i = 0;
+      for (exprNodeDesc e : conf.getValueCols()) {
         valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
       }
 
       partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
-      i=0;
-      for(exprNodeDesc e: conf.getPartitionCols()) {
+      i = 0;
+      for (exprNodeDesc e : conf.getPartitionCols()) {
         partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
       }
 
       tag = conf.getTag();
-      tagByte[0] = (byte)tag;
+      tagByte[0] = (byte) tag;
       LOG.info("Using tag = " + tag);
 
       tableDesc keyTableDesc = conf.getKeySerializeInfo();
-      keySerializer = (Serializer)keyTableDesc.getDeserializerClass().newInstance();
+      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+          .newInstance();
       keySerializer.initialize(null, keyTableDesc.getProperties());
       keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-      
+
       tableDesc valueTableDesc = conf.getValueSerializeInfo();
-      valueSerializer = (Serializer)valueTableDesc.getDeserializerClass().newInstance();
+      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+          .newInstance();
       valueSerializer.initialize(null, valueTableDesc.getProperties());
-      
+
       firstRow = true;
       initializeChildren(hconf);
     } catch (Exception e) {
@@ -116,65 +120,72 @@
   transient InspectableObject tempInspectableObject = new InspectableObject();
   transient HiveKey keyWritable = new HiveKey();
   transient Writable value;
-  
+
   transient StructObjectInspector keyObjectInspector;
   transient StructObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
   transient Object[] cachedKeys;
   transient Object[] cachedValues;
-  
+
   boolean firstRow;
-  
+
   transient Random random;
+
+  @Override
   public void processOp(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
         firstRow = false;
-        keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, conf.getOutputKeyColumnNames(), rowInspector);
-        valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf.getOutputValueColumnNames(), rowInspector);
+        keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, conf
+            .getOutputKeyColumnNames(), rowInspector);
+        valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
+            .getOutputValueColumnNames(), rowInspector);
         partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
 
         cachedKeys = new Object[keyEval.length];
         cachedValues = new Object[valueEval.length];
       }
-      
-      
+
       // Evaluate the keys
-      for (int i=0; i<keyEval.length; i++) {
+      for (int i = 0; i < keyEval.length; i++) {
         cachedKeys[i] = keyEval[i].evaluate(row);
       }
-      
+
       // Serialize the keys and append the tag
       if (keyIsText) {
-        Text key = (Text)keySerializer.serialize(cachedKeys, keyObjectInspector);
+        Text key = (Text) keySerializer.serialize(cachedKeys,
+            keyObjectInspector);
         if (tag == -1) {
           keyWritable.set(key.getBytes(), 0, key.getLength());
         } else {
           int keyLength = key.getLength();
-          keyWritable.setSize(keyLength+1);
+          keyWritable.setSize(keyLength + 1);
           System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
           keyWritable.get()[keyLength] = tagByte[0];
         }
       } else {
         // Must be BytesWritable
-        BytesWritable key = (BytesWritable)keySerializer.serialize(cachedKeys, keyObjectInspector);
+        BytesWritable key = (BytesWritable) keySerializer.serialize(cachedKeys,
+            keyObjectInspector);
         if (tag == -1) {
           keyWritable.set(key.get(), 0, key.getSize());
         } else {
           int keyLength = key.getSize();
-          keyWritable.setSize(keyLength+1);
+          keyWritable.setSize(keyLength + 1);
           System.arraycopy(key.get(), 0, keyWritable.get(), 0, keyLength);
           keyWritable.get()[keyLength] = tagByte[0];
         }
       }
-      
+
       // Set the HashCode
       int keyHashCode = 0;
       if (partitionEval.length == 0) {
-        // If no partition cols, just distribute the data uniformly to provide better
-        // load balance.  If the requirement is to have a single reducer, we should set
+        // If no partition cols, just distribute the data uniformly to provide
+        // better
+        // load balance. If the requirement is to have a single reducer, we
+        // should set
         // the number of reducers to 1.
         // Use a constant seed to make the code deterministic.
         if (random == null) {
@@ -184,47 +195,50 @@
       } else {
         for (int i = 0; i < partitionEval.length; i++) {
           Object o = partitionEval[i].evaluate(row);
-          keyHashCode = keyHashCode * 31 
-            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+          keyHashCode = keyHashCode * 31
+              + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
         }
       }
       keyWritable.setHashCode(keyHashCode);
-      
+
       // Evaluate the value
-      for (int i=0; i<valueEval.length; i++) {
+      for (int i = 0; i < valueEval.length; i++) {
         cachedValues[i] = valueEval[i].evaluate(row);
       }
       // Serialize the value
       value = valueSerializer.serialize(cachedValues, valueObjectInspector);
-      
+
     } catch (SerDeException e) {
       throw new HiveException(e);
     }
-    
+
     try {
       if (out != null) {
         out.collect(keyWritable, value);
-        // Since this is a terminal operator, update counters explicitly - forward is not called
+        // Since this is a terminal operator, update counters explicitly -
+        // forward is not called
         if (counterNameToEnum != null) {
-          ++this.outputRows;
-          if (this.outputRows % 1000 == 0) {
+          ++outputRows;
+          if (outputRows % 1000 == 0) {
             incrCounter(numOutputRowsCntr, outputRows);
-            this.outputRows = 0;
+            outputRows = 0;
           }
         }
       }
     } catch (IOException e) {
-      throw new HiveException (e);
+      throw new HiveException(e);
     }
   }
 
   /**
    * @return the name of the operator
    */
+  @Override
   public String getName() {
     return new String("RS");
   }
-  
+
+  @Override
   public int getType() {
     return OperatorType.REDUCESINK;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java Thu Jan 21 10:37:58 2010
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.*;
-import java.io.*;
+import java.io.Serializable;
+import java.util.Vector;
 
 /**
  * RowSchema Implementation
@@ -30,7 +30,8 @@
   private static final long serialVersionUID = 1L;
   private Vector<ColumnInfo> signature;
 
-  public RowSchema() {}
+  public RowSchema() {
+  }
 
   public RowSchema(Vector<ColumnInfo> signature) {
     this.signature = signature;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Thu Jan 21 10:37:58 2010
@@ -24,7 +24,6 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,15 +47,17 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
 
-
-public class ScriptOperator extends Operator<scriptDesc> implements Serializable {
+public class ScriptOperator extends Operator<scriptDesc> implements
+    Serializable {
 
   private static final long serialVersionUID = 1L;
 
+  public static enum Counter {
+    DESERIALIZE_ERRORS, SERIALIZE_ERRORS
+  }
 
-  public static enum Counter {DESERIALIZE_ERRORS, SERIALIZE_ERRORS}
-  transient private LongWritable deserialize_error_count = new LongWritable ();
-  transient private LongWritable serialize_error_count = new LongWritable ();
+  transient private final LongWritable deserialize_error_count = new LongWritable();
+  transient private final LongWritable serialize_error_count = new LongWritable();
 
   transient Thread outThread = null;
   transient Thread errThread = null;
@@ -69,14 +70,15 @@
   transient volatile Throwable scriptError = null;
   transient RecordWriter scriptOutWriter = null;
 
-  static final String IO_EXCEPTION_BROKEN_PIPE_STRING= "Broken pipe";
+  static final String IO_EXCEPTION_BROKEN_PIPE_STRING = "Broken pipe";
 
   /**
    * sends periodic reports back to the tracker.
    */
   transient AutoProgressor autoProgressor;
 
-  // first row - the process should only be started if necessary, as it may conflict with some
+  // first row - the process should only be started if necessary, as it may
+  // conflict with some
   // of the user assumptions.
   transient boolean firstRow;
 
@@ -89,7 +91,8 @@
     for (int i = 0; i < len; i++) {
       char c = var.charAt(i);
       char s;
-      if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
+      if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z')
+          || (c >= 'a' && c <= 'z')) {
         s = c;
       } else {
         s = '_';
@@ -99,35 +102,33 @@
     return safe.toString();
   }
 
-  static void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
+  static void addJobConfToEnvironment(Configuration conf,
+      Map<String, String> env) {
     Iterator<Map.Entry<String, String>> it = conf.iterator();
     while (it.hasNext()) {
-      Map.Entry<String, String> en = (Map.Entry<String, String>) it.next();
-      String name = (String) en.getKey();
-      //String value = (String)en.getValue(); // does not apply variable expansion
+      Map.Entry<String, String> en = it.next();
+      String name = en.getKey();
+      // String value = (String)en.getValue(); // does not apply variable
+      // expansion
       String value = conf.get(name); // does variable expansion
       name = safeEnvVarName(name);
       env.put(name, value);
     }
   }
 
-
   /**
-   * Maps a relative pathname to an absolute pathname using the
-   * PATH enviroment.
+   * Maps a relative pathname to an absolute pathname using the PATH enviroment.
    */
-  public class PathFinder
-  {
-    String pathenv;        // a string of pathnames
-    String pathSep;        // the path seperator
-    String fileSep;        // the file seperator in a directory
+  public class PathFinder {
+    String pathenv; // a string of pathnames
+    String pathSep; // the path seperator
+    String fileSep; // the file seperator in a directory
 
     /**
-     * Construct a PathFinder object using the path from
-     * the specified system environment variable.
+     * Construct a PathFinder object using the path from the specified system
+     * environment variable.
      */
-    public PathFinder(String envpath)
-    {
+    public PathFinder(String envpath) {
       pathenv = System.getenv(envpath);
       pathSep = System.getProperty("path.separator");
       fileSep = System.getProperty("file.separator");
@@ -136,25 +137,22 @@
     /**
      * Appends the specified component to the path list
      */
-    public void prependPathComponent(String str)
-    {
+    public void prependPathComponent(String str) {
       pathenv = str + pathSep + pathenv;
     }
 
     /**
-     * Returns the full path name of this file if it is listed in the
-     * path
+     * Returns the full path name of this file if it is listed in the path
      */
-    public File getAbsolutePath(String filename)
-    {
-      if (pathenv == null || pathSep == null  || fileSep == null) {
+    public File getAbsolutePath(String filename) {
+      if (pathenv == null || pathSep == null || fileSep == null) {
         return null;
       }
-      int     val = -1;
-      String    classvalue = pathenv + pathSep;
+      int val = -1;
+      String classvalue = pathenv + pathSep;
 
-      while (((val = classvalue.indexOf(pathSep)) >= 0) &&
-             classvalue.length() > 0) {
+      while (((val = classvalue.indexOf(pathSep)) >= 0)
+          && classvalue.length() > 0) {
         //
         // Extract each entry from the pathenv
         //
@@ -170,18 +168,20 @@
             f = new File(entry + fileSep + filename);
           }
           //
-          // see if the filename matches and  we can read it
+          // see if the filename matches and we can read it
           //
           if (f.isFile() && f.canRead()) {
             return f;
           }
-        } catch (Exception exp){ }
-        classvalue = classvalue.substring(val+1).trim();
+        } catch (Exception exp) {
+        }
+        classvalue = classvalue.substring(val + 1).trim();
       }
       return null;
     }
   }
 
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     firstRow = true;
 
@@ -191,18 +191,22 @@
     try {
       this.hconf = hconf;
 
-      scriptOutputDeserializer = conf.getScriptOutputInfo().getDeserializerClass().newInstance();
-      scriptOutputDeserializer.initialize(hconf, conf.getScriptOutputInfo().getProperties());
-
-      scriptInputSerializer = (Serializer)conf.getScriptInputInfo().getDeserializerClass().newInstance();
-      scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo().getProperties());
+      scriptOutputDeserializer = conf.getScriptOutputInfo()
+          .getDeserializerClass().newInstance();
+      scriptOutputDeserializer.initialize(hconf, conf.getScriptOutputInfo()
+          .getProperties());
+
+      scriptInputSerializer = (Serializer) conf.getScriptInputInfo()
+          .getDeserializerClass().newInstance();
+      scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo()
+          .getProperties());
 
       outputObjInspector = scriptOutputDeserializer.getObjectInspector();
 
       // initialize all children before starting the script
       initializeChildren(hconf);
     } catch (Exception e) {
-      throw new HiveException ("Cannot initialize ScriptOperator", e);
+      throw new HiveException("Cannot initialize ScriptOperator", e);
     }
   }
 
@@ -215,17 +219,20 @@
   }
 
   void displayBrokenPipeInfo() {
-    LOG.info("The script did not consume all input data. This is considered as an error.");
-    LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString() + "=true; to ignore it.");
+    LOG
+        .info("The script did not consume all input data. This is considered as an error.");
+    LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString()
+        + "=true; to ignore it.");
     return;
   }
 
+  @Override
   public void processOp(Object row, int tag) throws HiveException {
     // initialize the user's process only when you recieve the first row
     if (firstRow) {
       firstRow = false;
       try {
-        String [] cmdArgs = splitArgs(conf.getScriptCmd());
+        String[] cmdArgs = splitArgs(conf.getScriptCmd());
 
         String prog = cmdArgs[0];
         File currentDir = new File(".").getAbsoluteFile();
@@ -240,64 +247,78 @@
           f = null;
         }
 
-        String [] wrappedCmdArgs = addWrapper(cmdArgs);
+        String[] wrappedCmdArgs = addWrapper(cmdArgs);
         LOG.info("Executing " + Arrays.asList(wrappedCmdArgs));
-        LOG.info("tablename=" + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
-        LOG.info("partname=" + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname));
+        LOG.info("tablename="
+            + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
+        LOG.info("partname="
+            + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname));
         LOG.info("alias=" + alias);
 
         ProcessBuilder pb = new ProcessBuilder(wrappedCmdArgs);
         Map<String, String> env = pb.environment();
         addJobConfToEnvironment(hconf, env);
-        env.put(safeEnvVarName(HiveConf.ConfVars.HIVEALIAS.varname), String.valueOf(alias));
+        env.put(safeEnvVarName(HiveConf.ConfVars.HIVEALIAS.varname), String
+            .valueOf(alias));
 
-        // Create an environment variable that uniquely identifies this script operator
-        String idEnvVarName = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESCRIPTIDENVVAR);
-        String idEnvVarVal = this.getOperatorId();
+        // Create an environment variable that uniquely identifies this script
+        // operator
+        String idEnvVarName = HiveConf.getVar(hconf,
+            HiveConf.ConfVars.HIVESCRIPTIDENVVAR);
+        String idEnvVarVal = getOperatorId();
         env.put(safeEnvVarName(idEnvVarName), idEnvVarVal);
 
-        scriptPid = pb.start();       // Runtime.getRuntime().exec(wrappedCmdArgs);
+        scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs);
 
-        DataOutputStream scriptOut = new DataOutputStream(new BufferedOutputStream(scriptPid.getOutputStream()));
-        DataInputStream  scriptIn = new DataInputStream(new BufferedInputStream(scriptPid.getInputStream()));
-        DataInputStream  scriptErr = new DataInputStream(new BufferedInputStream(scriptPid.getErrorStream()));
+        DataOutputStream scriptOut = new DataOutputStream(
+            new BufferedOutputStream(scriptPid.getOutputStream()));
+        DataInputStream scriptIn = new DataInputStream(new BufferedInputStream(
+            scriptPid.getInputStream()));
+        DataInputStream scriptErr = new DataInputStream(
+            new BufferedInputStream(scriptPid.getErrorStream()));
 
         scriptOutWriter = conf.getInRecordWriterClass().newInstance();
         scriptOutWriter.initialize(scriptOut, hconf);
 
-        RecordReader scriptOutputReader = conf.getOutRecordReaderClass().newInstance();
-        scriptOutputReader.initialize(scriptIn, hconf, conf.getScriptOutputInfo().getProperties());
-
-        outThread = new StreamThread(scriptOutputReader, new OutputStreamProcessor(
-                                                                                   scriptOutputDeserializer.getObjectInspector()), "OutputProcessor");
-
-        RecordReader scriptErrReader = conf.getOutRecordReaderClass().newInstance();
-        scriptErrReader.initialize(scriptErr, hconf, conf.getScriptOutputInfo().getProperties());
-
-        errThread = new StreamThread(scriptErrReader,
-                                     new ErrorStreamProcessor
-                                     (HiveConf.getIntVar(hconf, HiveConf.ConfVars.SCRIPTERRORLIMIT)),
-                                     "ErrorProcessor");
-
-        if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) {
-          autoProgressor = new AutoProgressor(this.getClass().getName(), reporter,
-                                              Utilities.getDefaultNotificationInterval(hconf));
+        RecordReader scriptOutputReader = conf.getOutRecordReaderClass()
+            .newInstance();
+        scriptOutputReader.initialize(scriptIn, hconf, conf
+            .getScriptOutputInfo().getProperties());
+
+        outThread = new StreamThread(scriptOutputReader,
+            new OutputStreamProcessor(scriptOutputDeserializer
+                .getObjectInspector()), "OutputProcessor");
+
+        RecordReader scriptErrReader = conf.getOutRecordReaderClass()
+            .newInstance();
+        scriptErrReader.initialize(scriptErr, hconf, conf.getScriptOutputInfo()
+            .getProperties());
+
+        errThread = new StreamThread(scriptErrReader, new ErrorStreamProcessor(
+            HiveConf.getIntVar(hconf, HiveConf.ConfVars.SCRIPTERRORLIMIT)),
+            "ErrorProcessor");
+
+        if (HiveConf
+            .getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) {
+          autoProgressor = new AutoProgressor(this.getClass().getName(),
+              reporter, Utilities.getDefaultNotificationInterval(hconf));
           autoProgressor.go();
         }
 
         outThread.start();
         errThread.start();
       } catch (Exception e) {
-        throw new HiveException ("Cannot initialize ScriptOperator", e);
+        throw new HiveException("Cannot initialize ScriptOperator", e);
       }
     }
 
-    if(scriptError != null) {
+    if (scriptError != null) {
       throw new HiveException(scriptError);
     }
 
     try {
-      Writable res = scriptInputSerializer.serialize(row, inputObjInspectors[tag]);
+      Writable res = scriptInputSerializer.serialize(row,
+          inputObjInspectors[tag]);
       scriptOutWriter.write(res);
     } catch (SerDeException e) {
       LOG.error("Error in serializing the row: " + e.getMessage());
@@ -305,12 +326,13 @@
       serialize_error_count.set(serialize_error_count.get() + 1);
       throw new HiveException(e);
     } catch (IOException e) {
-      if(isBrokenPipeException(e) && allowPartialConsumption()) {
+      if (isBrokenPipeException(e) && allowPartialConsumption()) {
         setDone(true);
-        LOG.warn("Got broken pipe during write: ignoring exception and setting operator to done");
+        LOG
+            .warn("Got broken pipe during write: ignoring exception and setting operator to done");
       } else {
         LOG.error("Error in writing to script: " + e.getMessage());
-        if(isBrokenPipeException(e)) {
+        if (isBrokenPipeException(e)) {
           displayBrokenPipeInfo();
         }
         scriptError = e;
@@ -319,40 +341,45 @@
     }
   }
 
+  @Override
   public void close(boolean abort) throws HiveException {
 
     boolean new_abort = abort;
-    if(!abort) {
-      if(scriptError != null) {
+    if (!abort) {
+      if (scriptError != null) {
         throw new HiveException(scriptError);
       }
       // everything ok. try normal shutdown
       try {
         try {
-          if (scriptOutWriter != null)
+          if (scriptOutWriter != null) {
             scriptOutWriter.close();
+          }
         } catch (IOException e) {
-          if(isBrokenPipeException(e) && allowPartialConsumption()) {
+          if (isBrokenPipeException(e) && allowPartialConsumption()) {
             LOG.warn("Got broken pipe: ignoring exception");
           } else {
-            if(isBrokenPipeException(e)) {
+            if (isBrokenPipeException(e)) {
               displayBrokenPipeInfo();
             }
             throw e;
           }
         }
         int exitVal = 0;
-        if (scriptPid != null)
+        if (scriptPid != null) {
           exitVal = scriptPid.waitFor();
+        }
         if (exitVal != 0) {
           LOG.error("Script failed with code " + exitVal);
           new_abort = true;
-        };
+        }
+        ;
       } catch (IOException e) {
         LOG.error("Got ioexception: " + e.getMessage());
         e.printStackTrace();
         new_abort = true;
-      } catch (InterruptedException e) { }
+      } catch (InterruptedException e) {
+      }
 
     } else {
 
@@ -362,16 +389,17 @@
         // Interrupt the current thread after 1 second
         final Thread mythread = Thread.currentThread();
         Timer timer = new Timer(true);
-        timer.schedule(new TimerTask(){
+        timer.schedule(new TimerTask() {
           @Override
           public void run() {
             mythread.interrupt();
-          }},
-          1000);
+          }
+        }, 1000);
         // Wait for the child process to finish
         int exitVal = 0;
-        if (scriptPid != null)
+        if (scriptPid != null) {
           scriptPid.waitFor();
+        }
         // Cancel the timer
         timer.cancel();
         // Output the exit code
@@ -384,74 +412,82 @@
 
     // try these best effort
     try {
-      if (outThread != null)
+      if (outThread != null) {
         outThread.join(0);
+      }
     } catch (Exception e) {
-      LOG.warn("Exception in closing outThread: " + StringUtils.stringifyException(e));
+      LOG.warn("Exception in closing outThread: "
+          + StringUtils.stringifyException(e));
     }
 
     try {
-      if (errThread != null)
+      if (errThread != null) {
         errThread.join(0);
+      }
     } catch (Exception e) {
-      LOG.warn("Exception in closing errThread: " + StringUtils.stringifyException(e));
+      LOG.warn("Exception in closing errThread: "
+          + StringUtils.stringifyException(e));
     }
 
     try {
-      if (scriptPid != null)
+      if (scriptPid != null) {
         scriptPid.destroy();
+      }
     } catch (Exception e) {
-      LOG.warn("Exception in destroying scriptPid: " + StringUtils.stringifyException(e));
+      LOG.warn("Exception in destroying scriptPid: "
+          + StringUtils.stringifyException(e));
     }
 
     super.close(new_abort);
 
-    if(new_abort && !abort) {
-      throw new HiveException ("Hit error while closing ..");
+    if (new_abort && !abort) {
+      throw new HiveException("Hit error while closing ..");
     }
   }
 
-
   interface StreamProcessor {
     public void processLine(Writable line) throws HiveException;
+
     public void close() throws HiveException;
   }
 
-
   class OutputStreamProcessor implements StreamProcessor {
     Object row;
     ObjectInspector rowInspector;
+
     public OutputStreamProcessor(ObjectInspector rowInspector) {
       this.rowInspector = rowInspector;
     }
+
     public void processLine(Writable line) throws HiveException {
       try {
         row = scriptOutputDeserializer.deserialize(line);
       } catch (SerDeException e) {
-        deserialize_error_count.set(deserialize_error_count.get()+1);
+        deserialize_error_count.set(deserialize_error_count.get() + 1);
         return;
       }
       forward(row, rowInspector);
     }
+
     public void close() {
     }
   }
 
   /**
    * The processor for stderr stream.
-   *
-   * TODO: In the future when we move to hadoop 0.18 and above, we should borrow the logic
-   * from HadoopStreaming: PipeMapRed.java MRErrorThread to support counters and status
-   * updates.
+   * 
+   * TODO: In the future when we move to hadoop 0.18 and above, we should borrow
+   * the logic from HadoopStreaming: PipeMapRed.java MRErrorThread to support
+   * counters and status updates.
    */
   class ErrorStreamProcessor implements StreamProcessor {
     private long bytesCopied = 0;
-    private long maxBytes;
+    private final long maxBytes;
 
     private long lastReportTime;
 
-    public ErrorStreamProcessor (int maxBytes) {
-      this.maxBytes = (long)maxBytes;
+    public ErrorStreamProcessor(int maxBytes) {
+      this.maxBytes = maxBytes;
       lastReportTime = 0;
     }
 
@@ -460,12 +496,14 @@
       String stringLine = line.toString();
       int len = 0;
 
-      if (line instanceof Text)
-        len = ((Text)line).getLength();
-      else if (line instanceof BytesWritable)
-        len = ((BytesWritable)line).getSize();
+      if (line instanceof Text) {
+        len = ((Text) line).getLength();
+      } else if (line instanceof BytesWritable) {
+        len = ((BytesWritable) line).getSize();
+      }
 
-      // Report progress for each stderr line, but no more frequently than once per minute.
+      // Report progress for each stderr line, but no more frequently than once
+      // per minute.
       long now = System.currentTimeMillis();
       // reporter is a member variable of the Operator class.
       if (now - lastReportTime > 60 * 1000 && reporter != null) {
@@ -474,21 +512,22 @@
         reporter.progress();
       }
 
-      if((maxBytes < 0) || (bytesCopied < maxBytes)) {
+      if ((maxBytes < 0) || (bytesCopied < maxBytes)) {
         System.err.println(stringLine);
       }
       if (bytesCopied < maxBytes && bytesCopied + len >= maxBytes) {
         System.err.println("Operator " + id + " " + getName()
-            + ": exceeding stderr limit of " + maxBytes + " bytes, will truncate stderr messages.");
+            + ": exceeding stderr limit of " + maxBytes
+            + " bytes, will truncate stderr messages.");
       }
       bytesCopied += len;
     }
+
     public void close() {
     }
 
   }
 
-
   class StreamThread extends Thread {
 
     RecordReader in;
@@ -502,19 +541,20 @@
       setDaemon(true);
     }
 
+    @Override
     public void run() {
       try {
         Writable row = in.createRow();
 
-        while(true) {
+        while (true) {
           long bytes = in.next(row);
 
-          if(bytes <= 0) {
+          if (bytes <= 0) {
             break;
           }
           proc.processLine(row);
         }
-        LOG.info("StreamThread "+name+" done");
+        LOG.info("StreamThread " + name + " done");
 
       } catch (Throwable th) {
         scriptError = th;
@@ -534,27 +574,26 @@
   }
 
   /**
-   *  Wrap the script in a wrapper that allows admins to control
+   * Wrap the script in a wrapper that allows admins to control
    **/
-  protected String [] addWrapper(String [] inArgs) {
+  protected String[] addWrapper(String[] inArgs) {
     String wrapper = HiveConf.getVar(hconf, HiveConf.ConfVars.SCRIPTWRAPPER);
-    if(wrapper == null) {
+    if (wrapper == null) {
       return inArgs;
     }
 
-    String [] wrapComponents = splitArgs(wrapper);
+    String[] wrapComponents = splitArgs(wrapper);
     int totallength = wrapComponents.length + inArgs.length;
-    String [] finalArgv = new String [totallength];
-    for(int i=0; i<wrapComponents.length; i++) {
+    String[] finalArgv = new String[totallength];
+    for (int i = 0; i < wrapComponents.length; i++) {
       finalArgv[i] = wrapComponents[i];
     }
-    for(int i=0; i < inArgs.length; i++) {
-      finalArgv[wrapComponents.length+i] = inArgs[i];
+    for (int i = 0; i < inArgs.length; i++) {
+      finalArgv[wrapComponents.length + i] = inArgs[i];
     }
     return (finalArgv);
   }
 
-
   // Code below shameless borrowed from Hadoop Streaming
 
   public static String[] splitArgs(String args) {
@@ -612,6 +651,7 @@
     return "SCR";
   }
 
+  @Override
   public int getType() {
     return OperatorType.SCRIPT;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Jan 21 10:37:58 2010
@@ -31,44 +31,47 @@
 /**
  * Select operator implementation
  **/
-public class SelectOperator extends Operator <selectDesc> implements Serializable {
+public class SelectOperator extends Operator<selectDesc> implements
+    Serializable {
 
   private static final long serialVersionUID = 1L;
   transient protected ExprNodeEvaluator[] eval;
 
   transient Object[] output;
-  
+
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
-    	initializeChildren(hconf);
+      initializeChildren(hconf);
       return;
     }
-    
+
     ArrayList<exprNodeDesc> colList = conf.getColList();
     eval = new ExprNodeEvaluator[colList.size()];
-    for(int i=0; i<colList.size(); i++) {
-      assert(colList.get(i) != null);
+    for (int i = 0; i < colList.size(); i++) {
+      assert (colList.get(i) != null);
       eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i));
     }
-   
+
     output = new Object[eval.length];
-    LOG.info("SELECT " + ((StructObjectInspector)inputObjInspectors[0]).getTypeName());
+    LOG.info("SELECT "
+        + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
     outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf
-          .getOutputColumnNames(), inputObjInspectors[0]);
+        .getOutputColumnNames(), inputObjInspectors[0]);
     initializeChildren(hconf);
   }
 
-  public void processOp(Object row, int tag)
-      throws HiveException {
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
 
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
       forward(row, inputObjInspectors[tag]);
       return;
     }
-    
-    for(int i=0; i<eval.length; i++) {
+
+    for (int i = 0; i < eval.length; i++) {
       try {
         output[i] = eval[i].evaluate(row);
       } catch (HiveException e) {
@@ -88,7 +91,8 @@
   public String getName() {
     return new String("SEL");
   }
-  
+
+  @Override
   public int getType() {
     return OperatorType.SELECT;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Thu Jan 21 10:37:58 2010
@@ -66,40 +66,40 @@
  * Right now, we use one file per skew key.
  * 
  * <p>
- * For more info, please see
- * https://issues.apache.org/jira/browse/HIVE-964.
+ * For more info, please see https://issues.apache.org/jira/browse/HIVE-964.
  * 
  */
 public class SkewJoinHandler {
-  
-  static final protected Log LOG = LogFactory.getLog(SkewJoinHandler.class.getName());
-  
+
+  static final protected Log LOG = LogFactory.getLog(SkewJoinHandler.class
+      .getName());
+
   public int currBigKeyTag = -1;
-  
+
   private int rowNumber = 0;
   private int currTag = -1;
-  
+
   private int skewKeyDefinition = -1;
-  private Map<Byte,StructObjectInspector> skewKeysTableObjectInspector = null;
-  private Map<Byte,SerDe> tblSerializers = null;
+  private Map<Byte, StructObjectInspector> skewKeysTableObjectInspector = null;
+  private Map<Byte, SerDe> tblSerializers = null;
   private Map<Byte, tableDesc> tblDesc = null;
-  
+
   private Map<Byte, Boolean> bigKeysExistingMap = null;
-  
+
   Configuration hconf = null;
   List<Object> dummyKey = null;
   String taskId;
-  
-  private CommonJoinOperator<? extends Serializable> joinOp;
-  private int numAliases;
-  private joinDesc conf;
-  
+
+  private final CommonJoinOperator<? extends Serializable> joinOp;
+  private final int numAliases;
+  private final joinDesc conf;
+
   public SkewJoinHandler(CommonJoinOperator<? extends Serializable> joinOp) {
     this.joinOp = joinOp;
-    this.numAliases = joinOp.numAliases;
-    this.conf = joinOp.getConf();
+    numAliases = joinOp.numAliases;
+    conf = joinOp.getConf();
   }
-  
+
   public void initiliaze(Configuration hconf) {
     this.hconf = hconf;
     joinDesc desc = joinOp.getConf();
@@ -114,7 +114,7 @@
     for (int i = 0; i < numAliases; i++) {
       Byte alias = conf.getTagOrder()[i];
       List<ObjectInspector> skewTableKeyInspectors = new ArrayList<ObjectInspector>();
-      StructObjectInspector soi = (StructObjectInspector) this.joinOp.inputObjInspectors[alias];
+      StructObjectInspector soi = (StructObjectInspector) joinOp.inputObjInspectors[alias];
       StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
           .toString());
       List<? extends StructField> keyFields = ((StructObjectInspector) sf
@@ -124,7 +124,8 @@
         skewTableKeyInspectors.add(keyFields.get(k).getFieldObjectInspector());
       }
       tableDesc joinKeyDesc = desc.getKeyTableDesc();
-      List<String> keyColNames = Utilities.getColumnNames(joinKeyDesc.getProperties());
+      List<String> keyColNames = Utilities.getColumnNames(joinKeyDesc
+          .getProperties());
       StructObjectInspector structTblKeyInpector = ObjectInspectorFactory
           .getStandardStructObjectInspector(keyColNames, skewTableKeyInspectors);
 
@@ -135,21 +136,23 @@
         tblSerializers.put((byte) i, serializer);
       } catch (SerDeException e) {
         LOG.error("Skewjoin will be disabled due to " + e.getMessage(), e);
-        this.joinOp.handleSkewJoin = false;
+        joinOp.handleSkewJoin = false;
         break;
       }
 
-      tableDesc valTblDesc = this.joinOp.getSpillTableDesc(alias);
+      tableDesc valTblDesc = joinOp.getSpillTableDesc(alias);
       List<String> valColNames = new ArrayList<String>();
-      if (valTblDesc != null)
+      if (valTblDesc != null) {
         valColNames = Utilities.getColumnNames(valTblDesc.getProperties());
+      }
       StructObjectInspector structTblValInpector = ObjectInspectorFactory
           .getStandardStructObjectInspector(valColNames,
-              this.joinOp.joinValuesStandardObjectInspectors.get((byte) i));
+              joinOp.joinValuesStandardObjectInspectors.get((byte) i));
 
       StructObjectInspector structTblInpector = ObjectInspectorFactory
-          .getUnionStructObjectInspector(Arrays.asList(new StructObjectInspector[] {
-                  structTblValInpector,structTblKeyInpector }));
+          .getUnionStructObjectInspector(Arrays
+              .asList(new StructObjectInspector[] { structTblValInpector,
+                  structTblKeyInpector }));
       skewKeysTableObjectInspector.put((byte) i, structTblInpector);
     }
 
@@ -165,50 +168,57 @@
       }
     }
   }
-  
+
   void endGroup() throws IOException, HiveException {
-    if(skewKeyInCurrentGroup) {
-      
-      String specPath = conf.getBigKeysDirMap().get((byte)currBigKeyTag);
-      RowContainer<ArrayList<Object>> bigKey =  joinOp.storage.get(Byte.valueOf((byte)currBigKeyTag));
-      Path outputPath =  getOperatorOutputPath(specPath);
+    if (skewKeyInCurrentGroup) {
+
+      String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag);
+      RowContainer<ArrayList<Object>> bigKey = joinOp.storage.get(Byte
+          .valueOf((byte) currBigKeyTag));
+      Path outputPath = getOperatorOutputPath(specPath);
       FileSystem destFs = outputPath.getFileSystem(hconf);
       bigKey.copyToDFSDirecory(destFs, outputPath);
-      
+
       for (int i = 0; i < numAliases; i++) {
-        if (((byte)i) == currBigKeyTag) continue;
-        RowContainer<ArrayList<Object>> values =  joinOp.storage.get(Byte.valueOf((byte)i));
-        if(values != null) {
-          specPath = conf.getSmallKeysDirMap().get((byte)currBigKeyTag).get((byte)i);
+        if (((byte) i) == currBigKeyTag) {
+          continue;
+        }
+        RowContainer<ArrayList<Object>> values = joinOp.storage.get(Byte
+            .valueOf((byte) i));
+        if (values != null) {
+          specPath = conf.getSmallKeysDirMap().get((byte) currBigKeyTag).get(
+              (byte) i);
           values.copyToDFSDirecory(destFs, getOperatorOutputPath(specPath));
         }
       }
     }
     skewKeyInCurrentGroup = false;
   }
-  
+
   boolean skewKeyInCurrentGroup = false;
+
   public void handleSkew(int tag) throws HiveException {
 
-    if(joinOp.newGroupStarted || tag != currTag) {
+    if (joinOp.newGroupStarted || tag != currTag) {
       rowNumber = 0;
       currTag = tag;
     }
-    
-    if(joinOp.newGroupStarted) {
+
+    if (joinOp.newGroupStarted) {
       currBigKeyTag = -1;
       joinOp.newGroupStarted = false;
-      dummyKey = (List<Object>)joinOp.getGroupKeyObject();
+      dummyKey = (List<Object>) joinOp.getGroupKeyObject();
       skewKeyInCurrentGroup = false;
-      
+
       for (int i = 0; i < numAliases; i++) {
-        RowContainer<ArrayList<Object>> rc =  joinOp.storage.get(Byte.valueOf((byte)i));
-        if(rc != null) {
+        RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+            .valueOf((byte) i));
+        if (rc != null) {
           rc.setKeyObject(dummyKey);
         }
       }
     }
-    
+
     rowNumber++;
     if (currBigKeyTag == -1 && (tag < numAliases - 1)
         && rowNumber >= skewKeyDefinition) {
@@ -216,14 +226,15 @@
       // table (the last table can always be streamed), we define that we get
       // a skew key now.
       currBigKeyTag = tag;
-      
+
       // right now we assume that the group by is an ArrayList object. It may
       // change in future.
-      if(! (dummyKey instanceof List)) 
+      if (!(dummyKey instanceof List)) {
         throw new RuntimeException("Bug in handle skew key in a seperate job.");
-      
+      }
+
       skewKeyInCurrentGroup = true;
-      bigKeysExistingMap.put(Byte.valueOf((byte)currBigKeyTag), Boolean.TRUE);
+      bigKeysExistingMap.put(Byte.valueOf((byte) currBigKeyTag), Boolean.TRUE);
     }
   }
 
@@ -240,8 +251,9 @@
 
         // if we did not see a skew key in this table, continue to next
         // table
-        if (!bigKeysExistingMap.get((byte) bigKeyTbl))
+        if (!bigKeysExistingMap.get((byte) bigKeyTbl)) {
           continue;
+        }
 
         try {
           String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl);
@@ -249,8 +261,9 @@
           FileSystem fs = bigKeyPath.getFileSystem(hconf);
           delete(bigKeyPath, fs);
           for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
-            if (((byte) smallKeyTbl) == bigKeyTbl)
+            if (((byte) smallKeyTbl) == bigKeyTbl) {
               continue;
+            }
             specPath = conf.getSmallKeysDirMap().get((byte) bigKeyTbl).get(
                 (byte) smallKeyTbl);
             delete(getOperatorOutputPath(specPath), fs);
@@ -272,26 +285,30 @@
 
   private void commit() throws IOException {
     for (int bigKeyTbl = 0; bigKeyTbl < numAliases; bigKeyTbl++) {
-      
+
       // if we did not see a skew key in this table, continue to next table
       // we are trying to avoid an extra call of FileSystem.exists()
-      Boolean existing = bigKeysExistingMap.get(Byte.valueOf((byte)bigKeyTbl));
-      if (existing == null || !existing)
+      Boolean existing = bigKeysExistingMap.get(Byte.valueOf((byte) bigKeyTbl));
+      if (existing == null || !existing) {
         continue;
-      
-      String specPath = conf.getBigKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl));
+      }
+
+      String specPath = conf.getBigKeysDirMap().get(
+          Byte.valueOf((byte) bigKeyTbl));
       commitOutputPathToFinalPath(specPath, false);
       for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
-        if ( smallKeyTbl == bigKeyTbl)
+        if (smallKeyTbl == bigKeyTbl) {
           continue;
-        specPath = conf.getSmallKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl)).get(
-            Byte.valueOf((byte) smallKeyTbl));
+        }
+        specPath = conf.getSmallKeysDirMap()
+            .get(Byte.valueOf((byte) bigKeyTbl)).get(
+                Byte.valueOf((byte) smallKeyTbl));
         // the file may not exist, and we just ignore this
         commitOutputPathToFinalPath(specPath, true);
       }
     }
   }
-  
+
   private void commitOutputPathToFinalPath(String specPath,
       boolean ignoreNonExisting) throws IOException {
     Path outPath = getOperatorOutputPath(specPath);
@@ -304,23 +321,25 @@
         throw new IOException("Unable to rename output to: " + finalPath);
       }
     } catch (FileNotFoundException e) {
-      if (!ignoreNonExisting)
+      if (!ignoreNonExisting) {
         throw e;
+      }
     } catch (IOException e) {
-      if (!fs.exists(outPath) && ignoreNonExisting)
+      if (!fs.exists(outPath) && ignoreNonExisting) {
         return;
+      }
       throw e;
     }
   }
-  
+
   private Path getOperatorOutputPath(String specPath) throws IOException {
     Path tmpPath = Utilities.toTempPath(specPath);
     return new Path(tmpPath, Utilities.toTempPath(taskId));
   }
-  
+
   private Path getOperatorFinalPath(String specPath) throws IOException {
     Path tmpPath = Utilities.toTempPath(specPath);
     return new Path(tmpPath, taskId);
   }
-  
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Jan 21 10:37:58 2010
@@ -25,32 +25,37 @@
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 
 /**
- * Table Scan Operator
- * If the data is coming from the map-reduce framework, just forward it.
- * This will be needed as part of local work when data is not being read as part of map-reduce framework
+ * Table Scan Operator If the data is coming from the map-reduce framework, just
+ * forward it. This will be needed as part of local work when data is not being
+ * read as part of map-reduce framework
  **/
-public class TableScanOperator extends Operator<tableScanDesc> implements Serializable {
+public class TableScanOperator extends Operator<tableScanDesc> implements
+    Serializable {
   private static final long serialVersionUID = 1L;
 
   /**
-   * Currently, the table scan operator does not do anything special other than just forwarding the row. Since the 
-   * table data is always read as part of the map-reduce framework by the mapper. But, this assumption is not true,
-   * i.e table data is not only read by the mapper, this operator will be enhanced to read the table.
+   * Currently, the table scan operator does not do anything special other than
+   * just forwarding the row. Since the table data is always read as part of the
+   * map-reduce framework by the mapper. But, this assumption is not true, i.e
+   * table data is not only read by the mapper, this operator will be enhanced
+   * to read the table.
    **/
   @Override
-  public void processOp(Object row, int tag)
-      throws HiveException {
-    forward(row, inputObjInspectors[tag]);    
+  public void processOp(Object row, int tag) throws HiveException {
+    forward(row, inputObjInspectors[tag]);
   }
 
   /**
-   * The operator name for this operator type. This is used to construct the rule for an operator
+   * The operator name for this operator type. This is used to construct the
+   * rule for an operator
+   * 
    * @return the operator name
    **/
+  @Override
   public String getName() {
     return new String("TS");
   }
-  
+
   // this 'neededColumnIDs' field is included in this operator class instead of
   // its desc class.The reason is that 1)tableScanDesc can not be instantiated,
   // and 2) it will fail some join and union queries if this is added forcibly
@@ -58,13 +63,14 @@
   java.util.ArrayList<Integer> neededColumnIDs;
 
   public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
-    this.neededColumnIDs = orign_columns;
+    neededColumnIDs = orign_columns;
   }
 
   public java.util.ArrayList<Integer> getNeededColumnIDs() {
     return neededColumnIDs;
   }
 
+  @Override
   public int getType() {
     return OperatorType.TABLESCAN;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Thu Jan 21 10:37:58 2010
@@ -18,9 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -31,15 +38,12 @@
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-
 /**
  * Task implementation
  **/
 
-public abstract class Task <T extends Serializable> implements Serializable, Node {
+public abstract class Task<T extends Serializable> implements Serializable,
+    Node {
 
   private static final long serialVersionUID = 1L;
   transient protected boolean started;
@@ -68,7 +72,8 @@
     this.taskCounters = new HashMap<String, Long>();
   }
 
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+  public void initialize(HiveConf conf, QueryPlan queryPlan,
+      DriverContext driverContext) {
     this.queryPlan = queryPlan;
     isdone = false;
     started = false;
@@ -76,21 +81,22 @@
     this.conf = conf;
 
     try {
-        db = Hive.get(conf);
+      db = Hive.get(conf);
     } catch (HiveException e) {
       // Bail out ungracefully - we should never hit
       // this here - but would have hit it in SemanticAnalyzer
       LOG.error(StringUtils.stringifyException(e));
-      throw new RuntimeException (e);
+      throw new RuntimeException(e);
     }
     this.driverContext = driverContext;
-    
+
     console = new LogHelper(LOG);
   }
 
   /**
-   * This method is called in the Driver on every task. It updates counters
-   * and calls execute(), which is overridden in each task
+   * This method is called in the Driver on every task. It updates counters and
+   * calls execute(), which is overridden in each task
+   * 
    * @return return value of execute()
    */
   public int executeTask() {
@@ -112,32 +118,35 @@
   }
 
   /**
-   * This method is overridden in each Task.
-   * TODO execute should return a TaskHandle.
+   * This method is overridden in each Task. TODO execute should return a
+   * TaskHandle.
+   * 
    * @return status of executing the task
    */
   protected abstract int execute();
-  
+
   /**
-   * Update the progress of the task within taskHandle and also
-   * dump the progress information to the history file
-   * @param taskHandle task handle returned by execute
-   * @throws IOException 
+   * Update the progress of the task within taskHandle and also dump the
+   * progress information to the history file
+   * 
+   * @param taskHandle
+   *          task handle returned by execute
+   * @throws IOException
    */
   public void progress(TaskHandle taskHandle) throws IOException {
     // do nothing by default
   }
-  
+
   // dummy method - FetchTask overwrites this
-  public boolean fetch(Vector<String> res) throws IOException { 
+  public boolean fetch(Vector<String> res) throws IOException {
     assert false;
-  	return false;
+    return false;
   }
 
   public void setChildTasks(List<Task<? extends Serializable>> childTasks) {
     this.childTasks = childTasks;
   }
-  
+
   public List<? extends Node> getChildren() {
     return getChildTasks();
   }
@@ -155,7 +164,9 @@
   }
 
   /**
-   * Add a dependent task on the current task. Return if the dependency already existed or is this a new one
+   * Add a dependent task on the current task. Return if the dependency already
+   * existed or is this a new one
+   * 
    * @return true if the task got added false if it already existed
    */
   public boolean addDependentTask(Task<? extends Serializable> dependent) {
@@ -178,13 +189,17 @@
 
   /**
    * remove the dependent task
-   * @param dependent the task to remove
+   * 
+   * @param dependent
+   *          the task to remove
    */
   public void removeDependentTask(Task<? extends Serializable> dependent) {
     if ((getChildTasks() != null) && (getChildTasks().contains(dependent))) {
       getChildTasks().remove(dependent);
-      if ((dependent.getParentTasks() != null) && (dependent.getParentTasks().contains(this)))
+      if ((dependent.getParentTasks() != null)
+          && (dependent.getParentTasks().contains(this))) {
         dependent.getParentTasks().remove(this);
+      }
     }
   }
 
@@ -223,7 +238,7 @@
   public boolean isRunnable() {
     boolean isrunnable = true;
     if (parentTasks != null) {
-      for(Task<? extends Serializable> parent: parentTasks) {
+      for (Task<? extends Serializable> parent : parentTasks) {
         if (!parent.done()) {
           isrunnable = false;
           break;
@@ -269,8 +284,8 @@
   }
 
   /**
-   * Should be overridden to return the type of the specific task among
-   * the types in TaskType
+   * Should be overridden to return the type of the specific task among the
+   * types in TaskType
    * 
    * @return TaskTypeType.* or -1 if not overridden
    */

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Thu Jan 21 10:37:58 2010
@@ -18,11 +18,19 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.*;
-import java.io.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.FunctionWork;
+import org.apache.hadoop.hive.ql.plan.copyWork;
+import org.apache.hadoop.hive.ql.plan.explainWork;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.moveWork;
 
 /**
  * TaskFactory implementation
@@ -46,38 +54,43 @@
     taskvec.add(new taskTuple<fetchWork>(fetchWork.class, FetchTask.class));
     taskvec.add(new taskTuple<copyWork>(copyWork.class, CopyTask.class));
     taskvec.add(new taskTuple<DDLWork>(DDLWork.class, DDLTask.class));
-    taskvec.add(new taskTuple<FunctionWork>(FunctionWork.class, FunctionTask.class));
-    taskvec.add(new taskTuple<explainWork>(explainWork.class, ExplainTask.class));
-    taskvec.add(new taskTuple<ConditionalWork>(ConditionalWork.class, ConditionalTask.class));
+    taskvec.add(new taskTuple<FunctionWork>(FunctionWork.class,
+        FunctionTask.class));
+    taskvec
+        .add(new taskTuple<explainWork>(explainWork.class, ExplainTask.class));
+    taskvec.add(new taskTuple<ConditionalWork>(ConditionalWork.class,
+        ConditionalTask.class));
     // we are taking this out to allow us to instantiate either MapRedTask or
     // ExecDriver dynamically at run time based on configuration
-    // taskvec.add(new taskTuple<mapredWork>(mapredWork.class, ExecDriver.class));
+    // taskvec.add(new taskTuple<mapredWork>(mapredWork.class,
+    // ExecDriver.class));
   }
 
-  private static ThreadLocal<Integer> tid = new ThreadLocal<Integer> () {
+  private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {
+    @Override
     protected synchronized Integer initialValue() {
-        return new Integer(0);
-      }
+      return new Integer(0);
+    }
   };
 
   public static int getAndIncrementId() {
     int curValue = tid.get().intValue();
-    tid.set(new Integer(curValue+1));
+    tid.set(new Integer(curValue + 1));
     return curValue;
   }
 
-  
   public static void resetId() {
     tid.set(new Integer(0));
   }
-  
+
   @SuppressWarnings("unchecked")
-  public static <T extends Serializable> Task<T> get(Class<T> workClass, HiveConf conf) {
-      
-    for(taskTuple<? extends Serializable> t: taskvec) {
-      if(t.workClass == workClass) {
+  public static <T extends Serializable> Task<T> get(Class<T> workClass,
+      HiveConf conf) {
+
+    for (taskTuple<? extends Serializable> t : taskvec) {
+      if (t.workClass == workClass) {
         try {
-          Task<T> ret = (Task<T>)t.taskClass.newInstance();
+          Task<T> ret = (Task<T>) t.taskClass.newInstance();
           ret.setId("Stage-" + Integer.toString(getAndIncrementId()));
           return ret;
         } catch (Exception e) {
@@ -85,57 +98,58 @@
         }
       }
     }
-    
-    if(workClass == mapredWork.class) {
+
+    if (workClass == mapredWork.class) {
 
       boolean viachild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
-      
+
       try {
 
         // in local mode - or if otherwise so configured - always submit
         // jobs via separate jvm
         Task<T> ret = null;
-        if(conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local") || viachild) {
-          ret = (Task<T>)MapRedTask.class.newInstance();
+        if (conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local") || viachild) {
+          ret = (Task<T>) MapRedTask.class.newInstance();
         } else {
-          ret = (Task<T>)ExecDriver.class.newInstance();
+          ret = (Task<T>) ExecDriver.class.newInstance();
         }
         ret.setId("Stage-" + Integer.toString(getAndIncrementId()));
         return ret;
       } catch (Exception e) {
-        throw new RuntimeException (e.getMessage(), e);
+        throw new RuntimeException(e.getMessage(), e);
       }
 
     }
 
-    throw new RuntimeException ("No task for work class " + workClass.getName());
+    throw new RuntimeException("No task for work class " + workClass.getName());
   }
 
   public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
-             Task<? extends Serializable> ... tasklist) {
-    Task<T> ret = get((Class <T>)work.getClass(), conf);
+      Task<? extends Serializable>... tasklist) {
+    Task<T> ret = get((Class<T>) work.getClass(), conf);
     ret.setWork(work);
-    if(tasklist.length == 0)
+    if (tasklist.length == 0) {
       return (ret);
+    }
 
-    ArrayList<Task<? extends Serializable>> clist = new ArrayList<Task<? extends Serializable>> ();
-    for(Task<? extends Serializable> tsk: tasklist) {
+    ArrayList<Task<? extends Serializable>> clist = new ArrayList<Task<? extends Serializable>>();
+    for (Task<? extends Serializable> tsk : tasklist) {
       clist.add(tsk);
     }
     ret.setChildTasks(clist);
     return (ret);
   }
 
-  public static <T extends Serializable> Task<T> getAndMakeChild(
-          T work, HiveConf conf,
-          Task<? extends Serializable> ... tasklist) {
-    Task<T> ret = get((Class <T>)work.getClass(), conf);
+  public static <T extends Serializable> Task<T> getAndMakeChild(T work,
+      HiveConf conf, Task<? extends Serializable>... tasklist) {
+    Task<T> ret = get((Class<T>) work.getClass(), conf);
     ret.setWork(work);
-    if(tasklist.length == 0)
+    if (tasklist.length == 0) {
       return (ret);
+    }
 
     // Add the new task as child of each of the passed in tasks
-    for(Task<? extends Serializable> tsk: tasklist) {
+    for (Task<? extends Serializable> tsk : tasklist) {
       List<Task<? extends Serializable>> children = tsk.getChildTasks();
       if (children == null) {
         children = new ArrayList<Task<? extends Serializable>>();
@@ -143,7 +157,7 @@
       children.add(ret);
       tsk.setChildTasks(children);
     }
-    
+
     return (ret);
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java Thu Jan 21 10:37:58 2010
@@ -5,10 +5,13 @@
 import org.apache.hadoop.mapred.Counters;
 
 public class TaskHandle {
-  // The eventual goal is to monitor the progress of all the tasks, not only the map reduce task.
-  // The execute() method of the tasks will return immediately, and return a task specific handle to 
-  // monitor the progress of that task. 
-  // Right now, the behavior is kind of broken, ExecDriver's execute method calls progress - instead it should
+  // The eventual goal is to monitor the progress of all the tasks, not only the
+  // map reduce task.
+  // The execute() method of the tasks will return immediately, and return a
+  // task specific handle to
+  // monitor the progress of that task.
+  // Right now, the behavior is kind of broken, ExecDriver's execute method
+  // calls progress - instead it should
   // be invoked by Driver
   public Counters getCounters() throws IOException {
     // default implementation

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java Thu Jan 21 10:37:58 2010
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.*;
 
 /**
  * TaskResult implementation
@@ -27,6 +26,7 @@
 public class TaskResult {
   protected int exitVal;
   protected boolean runStatus;
+
   public TaskResult() {
     exitVal = -1;
     setRunning(true);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Thu Jan 21 10:37:58 2010
@@ -18,19 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.util.StringUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.Serializable;
 
+import org.apache.hadoop.hive.ql.session.SessionState;
 
 /**
  * TaskRunner implementation
@@ -51,6 +41,7 @@
     return tsk;
   }
 
+  @Override
   public void run() {
     SessionState.start(ss);
     runSequential();
@@ -65,5 +56,4 @@
     result.setExitVal(exitVal);
   }
 
-
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java Thu Jan 21 10:37:58 2010
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.*;
+import java.io.Serializable;
 
 /**
  * Terminal Operator Base Class
  **/
-public abstract class TerminalOperator <T extends Serializable> extends Operator <T>
-  implements Serializable {
+public abstract class TerminalOperator<T extends Serializable> extends
+    Operator<T> implements Serializable {
   private static final long serialVersionUID = 1L;
 
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java Thu Jan 21 10:37:58 2010
@@ -22,20 +22,19 @@
 import java.io.InputStream;
 import java.util.Properties;
 
-import org.apache.hadoop.mapred.LineRecordReader.LineReader;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 
 public class TextRecordReader implements RecordReader {
 
-  private LineReader  lineReader;
+  private LineReader lineReader;
   private InputStream in;
-  private Text        row;
+  private Text row;
 
-  public void initialize(InputStream in, Configuration conf, Properties tbl) throws IOException {
+  public void initialize(InputStream in, Configuration conf, Properties tbl)
+      throws IOException {
     lineReader = new LineReader(in, conf);
     this.in = in;
   }
@@ -46,14 +45,16 @@
   }
 
   public int next(Writable row) throws IOException {
-    if (lineReader == null)
+    if (lineReader == null) {
       return -1;
+    }
 
-    return lineReader.readLine((Text)row);
+    return lineReader.readLine((Text) row);
   }
 
   public void close() throws IOException {
-    if (in != null)
+    if (in != null) {
       in.close();
+    }
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java Thu Jan 21 10:37:58 2010
@@ -22,19 +22,20 @@
 import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 
 public class TextRecordWriter implements RecordWriter {
 
   private OutputStream out;
 
-  public void initialize(OutputStream out, Configuration conf) throws IOException {
+  public void initialize(OutputStream out, Configuration conf)
+      throws IOException {
     this.out = out;
   }
 
   public void write(Writable row) throws IOException {
-    Text text = (Text)row;
+    Text text = (Text) row;
     out.write(text.getBytes(), 0, text.getLength());
     out.write(Utilities.newLineCode);
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java Thu Jan 21 10:37:58 2010
@@ -18,21 +18,13 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.*;
-import java.util.*;
-import java.util.regex.Pattern;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URL;
-import java.net.URLEncoder;
-import java.net.URLDecoder;
-import java.net.MalformedURLException;
-import java.net.InetSocketAddress;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobTracker;
 
 /*
  * Intelligence to make clients wait if the cluster is in a bad state.
@@ -51,14 +43,14 @@
   /**
    * fetch http://tracker.om:/gc.jsp?threshold=period
    */
-  static void checkJobTracker(JobConf conf, Log LOG)  {
+  static void checkJobTracker(JobConf conf, Log LOG) {
 
     try {
-      byte buffer[] = new byte[1024]; 
+      byte buffer[] = new byte[1024];
       int threshold = conf.getInt("mapred.throttle.threshold.percent",
-                                  DEFAULT_MEMORY_GC_PERCENT);
+          DEFAULT_MEMORY_GC_PERCENT);
       int retry = conf.getInt("mapred.throttle.retry.period",
-                              DEFAULT_RETRY_PERIOD);
+          DEFAULT_RETRY_PERIOD);
 
       // If the threshold is 100 percent, then there is no throttling
       if (threshold == 100) {
@@ -66,35 +58,35 @@
       }
 
       // This is the Job Tracker URL
-      String tracker = JobTrackerURLResolver.getURL(conf) +
-                       "/gc.jsp?threshold=" + threshold;
+      String tracker = JobTrackerURLResolver.getURL(conf)
+          + "/gc.jsp?threshold=" + threshold;
 
       while (true) {
         // read in the first 1K characters from the URL
         URL url = new URL(tracker);
         LOG.debug("Throttle: URL " + tracker);
         InputStream in = url.openStream();
-        int numRead = in.read(buffer);
+        in.read(buffer);
         in.close();
         String fetchString = new String(buffer);
 
         // fetch the xml tag <dogc>xxx</dogc>
-        Pattern dowait = Pattern.compile("<dogc>",
-                         Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+        Pattern dowait = Pattern.compile("<dogc>", Pattern.CASE_INSENSITIVE
+            | Pattern.DOTALL | Pattern.MULTILINE);
         String[] results = dowait.split(fetchString);
         if (results.length != 2) {
-          throw new IOException("Throttle: Unable to parse response of URL " + url + 
-                                ". Get retuned " + fetchString);
+          throw new IOException("Throttle: Unable to parse response of URL "
+              + url + ". Get retuned " + fetchString);
         }
-        dowait = Pattern.compile("</dogc>",
-                         Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+        dowait = Pattern.compile("</dogc>", Pattern.CASE_INSENSITIVE
+            | Pattern.DOTALL | Pattern.MULTILINE);
         results = dowait.split(results[1]);
         if (results.length < 1) {
-          throw new IOException("Throttle: Unable to parse response of URL " + url + 
-                                ". Get retuned " + fetchString);
+          throw new IOException("Throttle: Unable to parse response of URL "
+              + url + ". Get retuned " + fetchString);
         }
 
-        // if the jobtracker signalled that the threshold is not exceeded, 
+        // if the jobtracker signalled that the threshold is not exceeded,
         // then we return immediately.
         if (results[0].trim().compareToIgnoreCase("false") == 0) {
           return;
@@ -102,8 +94,8 @@
 
         // The JobTracker has exceeded its threshold and is doing a GC.
         // The client has to wait and retry.
-        LOG.warn("Job is being throttled because of resource crunch on the " +
-                 "JobTracker. Will retry in " + retry + " seconds..");
+        LOG.warn("Job is being throttled because of resource crunch on the "
+            + "JobTracker. Will retry in " + retry + " seconds..");
         Thread.sleep(retry * 1000L);
       }
     } catch (Exception e) {