You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/05 20:52:26 UTC

svn commit: r1615978 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: OperatorFactory.java ReduceSinkOperator.java mr/ExecMapper.java mr/ExecReducer.java tez/MapRecordProcessor.java tez/ReduceRecordProcessor.java

Author: szehon
Date: Tue Aug  5 18:52:26 2014
New Revision: 1615978

URL: http://svn.apache.org/r1615978
Log:
HIVE-7596 : Cleanup OperatorFactory, ReduceSinkOperator, and reportStats (Brock Noland via Szehon)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Aug  5 18:52:26 2014
@@ -64,23 +64,9 @@ import org.apache.hadoop.hive.ql.plan.Un
  *
  */
 public final class OperatorFactory {
+  private static final List<OpTuple> opvec;
+  private static final List<OpTuple> vectorOpvec;
 
-  /**
-   * OpTuple.
-   *
-   * @param <T>
-   */
-  public static final class OpTuple<T extends OperatorDesc> {
-    public Class<T> descClass;
-    public Class<? extends Operator<T>> opClass;
-
-    public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
-      this.descClass = descClass;
-      this.opClass = opClass;
-    }
-  }
-
-  public static ArrayList<OpTuple> opvec;
   static {
     opvec = new ArrayList<OpTuple>();
     opvec.add(new OpTuple<FilterDesc>(FilterDesc.class, FilterOperator.class));
@@ -116,7 +102,6 @@ public final class OperatorFactory {
         MuxOperator.class));
   }
 
-  public static ArrayList<OpTuple> vectorOpvec;
   static {
     vectorOpvec = new ArrayList<OpTuple>();
     vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
@@ -130,6 +115,17 @@ public final class OperatorFactory {
     vectorOpvec.add(new OpTuple<LimitDesc>(LimitDesc.class, VectorLimitOperator.class));
   }
 
+  private static final class OpTuple<T extends OperatorDesc> {
+    private final Class<T> descClass;
+    private final Class<? extends Operator<T>> opClass;
+
+    public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
+      this.descClass = descClass;
+      this.opClass = opClass;
+    }
+  }
+
+
   public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
       VectorizationContext vContext) throws HiveException {
     Class<T> descClass = (Class<T>) conf.getClass();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Aug  5 18:52:26 2014
@@ -25,6 +25,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -61,9 +63,20 @@ public class ReduceSinkOperator extends 
     PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex");
   }
 
+  private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
   private static final long serialVersionUID = 1L;
-  protected transient OutputCollector out;
+  private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance();
+
+  private transient ObjectInspector[] partitionObjectInspectors;
+  private transient ObjectInspector[] bucketObjectInspectors;
+  private transient int buckColIdxInKey;
+  private boolean firstRow;
+  private transient int tag;
+  private boolean skipTag = false;
+  private transient InspectableObject tempInspectableObject = new InspectableObject();
+  private transient int[] valueIndex; // index for value(+ from keys, - from values)
 
+  protected transient OutputCollector out;
   /**
    * The evaluators for the key columns. Key columns decide the sort order on
    * the reducer side. Key columns are passed to the reducer in the "key".
@@ -84,38 +97,40 @@ public class ReduceSinkOperator extends 
    * Evaluators for bucketing columns. This is used to compute bucket number.
    */
   protected transient ExprNodeEvaluator[] bucketEval = null;
-
-  // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
-  // ready
+  // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready
   protected transient Serializer keySerializer;
   protected transient boolean keyIsText;
   protected transient Serializer valueSerializer;
-  transient int tag;
   protected transient byte[] tagByte = new byte[1];
-  transient protected int numDistributionKeys;
-  transient protected int numDistinctExprs;
-  transient String[] inputAliases;  // input aliases of this RS for join (used for PPD)
-  private boolean skipTag = false;
+  protected transient int numDistributionKeys;
+  protected transient int numDistinctExprs;
+  protected transient String[] inputAliases;  // input aliases of this RS for join (used for PPD)
   protected transient boolean autoParallel = false;
-  
-  protected static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance();
-
-  private transient int[] valueIndex; // index for value(+ from keys, - from values)
-
-  public void setInputAliases(String[] inputAliases) {
-    this.inputAliases = inputAliases;
-  }
-
-  public String[] getInputAliases() {
-    return inputAliases;
-  }
-
-  public void setOutputCollector(OutputCollector _out) {
-    this.out = _out;
-  }
-
   // picks topN K:V pairs from input.
   protected transient TopNHash reducerHash = new TopNHash();
+  protected transient HiveKey keyWritable = new HiveKey();
+  protected transient ObjectInspector keyObjectInspector;
+  protected transient ObjectInspector valueObjectInspector;
+  protected transient Object[] cachedValues;
+  protected transient List<List<Integer>> distinctColIndices;
+  protected transient Random random;
+  /**
+   * This two dimensional array holds key data and a corresponding Union object
+   * which contains the tag identifying the aggregate expression for distinct columns.
+   *
+   * If there is no distict expression, cachedKeys is simply like this.
+   * cachedKeys[0] = [col0][col1]
+   *
+   * with two distict expression, union(tag:key) is attatched for each distinct expression
+   * cachedKeys[0] = [col0][col1][0:dist1]
+   * cachedKeys[1] = [col0][col1][1:dist2]
+   *
+   * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+   * see {@link ExprNodeColumnEvaluator}
+   */
+  // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
+  protected transient Object[][] cachedKeys;
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
@@ -184,40 +199,12 @@ public class ReduceSinkOperator extends 
       firstRow = true;
       initializeChildren(hconf);
     } catch (Exception e) {
-      e.printStackTrace();
+      String msg = "Error initializing ReduceSinkOperator: " + e.getMessage();
+      LOG.error(msg, e);
       throw new RuntimeException(e);
     }
   }
 
-  transient InspectableObject tempInspectableObject = new InspectableObject();
-  protected transient HiveKey keyWritable = new HiveKey();
-
-  protected transient ObjectInspector keyObjectInspector;
-  protected transient ObjectInspector valueObjectInspector;
-  transient ObjectInspector[] partitionObjectInspectors;
-  transient ObjectInspector[] bucketObjectInspectors = null;
-  transient int buckColIdxInKey;
-
-  protected transient Object[] cachedValues;
-  protected transient List<List<Integer>> distinctColIndices;
-  /**
-   * This two dimensional array holds key data and a corresponding Union object
-   * which contains the tag identifying the aggregate expression for distinct columns.
-   *
-   * If there is no distict expression, cachedKeys is simply like this.
-   * cachedKeys[0] = [col0][col1]
-   *
-   * with two distict expression, union(tag:key) is attatched for each distinct expression
-   * cachedKeys[0] = [col0][col1][0:dist1]
-   * cachedKeys[1] = [col0][col1][1:dist2]
-   *
-   * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
-   * see {@link ExprNodeColumnEvaluator}
-   */
-  // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
-  protected transient Object[][] cachedKeys;
-  boolean firstRow;
-  protected transient Random random;
 
   /**
    * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
@@ -509,4 +496,16 @@ public class ReduceSinkOperator extends 
   public int[] getValueIndex() {
     return valueIndex;
   }
+
+  public void setInputAliases(String[] inputAliases) {
+    this.inputAliases = inputAliases;
+  }
+
+  public String[] getInputAliases() {
+    return inputAliases;
+  }
+
+  public void setOutputCollector(OutputCollector _out) {
+    this.out = _out;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Aug  5 18:52:26 2014
@@ -250,7 +250,7 @@ public class ExecMapper extends MapReduc
             + used_memory);
       }
 
-      reportStats rps = new reportStats(rp);
+      ReportStats rps = new ReportStats(rp);
       mo.preorderMap(rps);
       return;
     } catch (Exception e) {
@@ -285,10 +285,10 @@ public class ExecMapper extends MapReduc
    * reportStats.
    *
    */
-  public static class reportStats implements Operator.OperatorFunc {
-    Reporter rp;
+  public static class ReportStats implements Operator.OperatorFunc {
+    private final Reporter rp;
 
-    public reportStats(Reporter rp) {
+    public ReportStats(Reporter rp) {
       this.rp = rp;
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Aug  5 18:52:26 2014
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -306,7 +306,7 @@ public class ExecReducer extends MapRedu
       }
 
       reducer.close(abort);
-      reportStats rps = new reportStats(rp);
+      ReportStats rps = new ReportStats(rp);
       reducer.preorderMap(rps);
 
     } catch (Exception e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Aug  5 18:52:26 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
@@ -225,7 +225,7 @@ public class MapRecordProcessor extends 
       if (isLogInfoEnabled) {
         logCloseInfo();
       }
-      reportStats rps = new reportStats(reporter);
+      ReportStats rps = new ReportStats(reporter);
       mapOp.preorderMap(rps);
       return;
     } catch (Exception e) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1615978&r1=1615977&r2=1615978&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Aug  5 18:52:26 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
@@ -519,7 +519,7 @@ public class ReduceRecordProcessor  exte
           dummyOp.close(abort);
         }
       }
-      reportStats rps = new reportStats(reporter);
+      ReportStats rps = new ReportStats(reporter);
       reducer.preorderMap(rps);
 
     } catch (Exception e) {