You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2009/12/15 01:32:04 UTC

svn commit: r890582 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java

Author: heyongqiang
Date: Tue Dec 15 00:32:04 2009
New Revision: 890582

URL: http://svn.apache.org/viewvc?rev=890582&view=rev
Log:
Hive-991 union with 200 kids fail

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=890582&r1=890581&r2=890582&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Dec 15 00:32:04 2009
@@ -321,6 +321,9 @@
 
     HIVE-973. Support nested types in custom scripts. (Namit Jain via zshao)
 
+    HIVE-991 union with 200 kids fail
+    (namit via He Yongqiang)
+
 Release 0.4.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=890582&r1=890581&r2=890582&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Dec 15 00:32:04 2009
@@ -50,7 +50,7 @@
   // Bean methods
 
   private static final long serialVersionUID = 1L;
-  
+
   protected List<Operator<? extends Serializable>> childOperators;
   protected List<Operator<? extends Serializable>> parentOperators;
   protected String operatorId;
@@ -63,28 +63,28 @@
    * Individual operators can add to this list via addToCounterNames methods
    */
   protected ArrayList<String> counterNames;
-  
+
   /**
    * Each operator has its own map of its counter names to disjoint
    * ProgressCounter - it is populated at compile time and is read in
    * at run-time while extracting the operator specific counts
    */
   protected HashMap<String, ProgressCounter> counterNameToEnum;
-  
+
 
   private static int seqId;
-  
+
   // It can be optimized later so that an operator operator (init/close) is performed
   // only after that operation has been performed on all the parents. This will require
   // initializing the whole tree in all the mappers (which might be required for mappers
   // spanning multiple files anyway, in future)
-  public static enum State { 
+  public static enum State {
     UNINIT,   // initialize() has not been called
     INIT,     // initialize() has been called and close() has not been called,
               // or close() has been called but one of its parent is not closed.
-    CLOSE     // all its parents operators are in state CLOSE and called close() 
+    CLOSE     // all its parents operators are in state CLOSE and called close()
               // to children. Note: close() being called and its state being CLOSE is
-              // difference since close() could be called but state is not CLOSE if 
+              // difference since close() could be called but state is not CLOSE if
               // one of its parent is not in state CLOSE..
   };
   transient protected State state = State.UNINIT;
@@ -92,11 +92,11 @@
   static {
     seqId = 0;
   }
-  
+
   public Operator() {
     id = String.valueOf(seqId++);
   }
-  
+
   /**
    * Create an operator with a reporter.
    * @param reporter Used to report progress of certain operators.
@@ -118,19 +118,19 @@
    * Implements the getChildren function for the Node Interface.
    */
   public Vector<Node> getChildren() {
-    
+
     if (getChildOperators() == null) {
       return null;
     }
-    
+
     Vector<Node> ret_vec = new Vector<Node>();
     for(Operator<? extends Serializable> op: getChildOperators()) {
       ret_vec.add(op);
     }
-    
+
     return ret_vec;
   }
-  
+
   public void setParentOperators(List<Operator<? extends Serializable>> parentOperators) {
     this.parentOperators = parentOperators;
   }
@@ -138,7 +138,7 @@
   public List<Operator<? extends Serializable>> getParentOperators() {
     return parentOperators;
   }
-  
+
   protected T conf;
   protected boolean done;
 
@@ -169,7 +169,7 @@
   public RowSchema getSchema() {
     return rowSchema;
   }
-  
+
   // non-bean ..
 
   transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable> ();
@@ -179,9 +179,9 @@
   transient protected Reporter reporter;
   transient protected String id;
   // object inspectors for input rows
-  transient protected ObjectInspector[] inputObjInspectors = new ObjectInspector[Byte.MAX_VALUE];
+  transient protected ObjectInspector[] inputObjInspectors = new ObjectInspector[Short.MAX_VALUE];
   // for output rows of this operator
-  transient protected ObjectInspector outputObjInspector; 
+  transient protected ObjectInspector outputObjInspector;
 
   /**
    * A map of output column name to input expression map. This is used by optimizer
@@ -193,7 +193,7 @@
   public void setId(String id) {
     this.id = id;
   }
-  
+
   /**
    * This function is not named getId(), to make sure java serialization
    * does NOT serialize it.  Some TestParse tests will fail if we serialize
@@ -201,7 +201,7 @@
    * query tests.
    */
   public String getIdentifier() { return id; }
-  
+
   public void setReporter(Reporter rep) {
     reporter = rep;
 
@@ -213,7 +213,7 @@
       op.setReporter(rep);
     }
   }
-  
+
   public void setOutputCollector(OutputCollector out) {
     this.out = out;
 
@@ -267,7 +267,7 @@
   /**
    * Initializes operators only if all parents have been initialized.
    * Calls operator specific initializer which then initializes child ops.
-   * 
+   *
    * @param hconf
    * @param inputOIs input object inspector array indexes by tag id. null value is ignored.
    * @throws HiveException
@@ -281,24 +281,24 @@
       return;
     }
     LOG.info("Initializing Self " + id + " " + getName());
-    
+
     if (inputOIs != null) {
       inputObjInspectors = inputOIs;
     }
-    
+
     // initialize structure to maintain child op info. operator tree changes while
     // initializing so this need to be done here instead of initialize() method
     if (childOperators != null) {
       childOperatorsArray = new Operator[childOperators.size()];
       for (int i=0; i<childOperatorsArray.length; i++) {
-        childOperatorsArray[i] = childOperators.get(i); 
+        childOperatorsArray[i] = childOperators.get(i);
       }
       childOperatorsTag = new int[childOperatorsArray.length];
       for (int i=0; i<childOperatorsArray.length; i++) {
-        List<Operator<? extends Serializable>> parentOperators = 
+        List<Operator<? extends Serializable>> parentOperators =
           childOperatorsArray[i].getParentOperators();
         if (parentOperators == null) {
-          throw new HiveException("Hive internal error: parent is null in " 
+          throw new HiveException("Hive internal error: parent is null in "
               + childOperatorsArray[i].getClass() + "!");
         }
         childOperatorsTag[i] = parentOperators.indexOf(this);
@@ -307,13 +307,13 @@
         }
       }
     }
-    
+
     if (inputObjInspectors.length == 0) {
       throw new HiveException("Internal Error during operator initialization.");
     }
     // derived classes can set this to different object if needed
     outputObjInspector = inputObjInspectors[0];
-    
+
     initializeOp(hconf);
     LOG.info("Initialization Done " + id + " " + getName());
   }
@@ -324,7 +324,7 @@
   protected void initializeOp(Configuration hconf) throws HiveException {
     initializeChildren(hconf);
   }
- 
+
   /**
    * Calls initialize on each of the children with outputObjetInspector as the output row format
    */
@@ -351,10 +351,10 @@
     LOG.info("Initializing child " + id + " " + getName());
     inputObjInspectors[parentId] = inputOI;
     // call the actual operator initialization function
-    initialize(hconf, null);    
+    initialize(hconf, null);
   }
 
-  
+
    /**
    * Process the row.
    * @param row  The object representing the row.
@@ -362,7 +362,7 @@
    *             Rows with the same tag should have exactly the same rowInspector all the time.
    */
   public abstract void processOp(Object row, int tag) throws HiveException;
-  
+
   /**
    * Process the row.
    * @param row  The object representing the row.
@@ -374,32 +374,32 @@
     processOp(row, tag);
     postProcessCounter();
   }
-  
+
   // If a operator wants to do some work at the beginning of a group
   public void startGroup() throws HiveException {
     LOG.debug("Starting group");
-    
+
     if (childOperators == null)
       return;
-    
+
     LOG.debug("Starting group for children:");
     for (Operator<? extends Serializable> op: childOperators)
       op.startGroup();
-    
+
     LOG.debug("Start group Done");
-  }  
-  
+  }
+
   // If a operator wants to do some work at the end of a group
   public void endGroup() throws HiveException {
     LOG.debug("Ending group");
-    
+
     if (childOperators == null)
       return;
-    
+
     LOG.debug("Ending group for children:");
     for (Operator<? extends Serializable> op: childOperators)
       op.endGroup();
-    
+
     LOG.debug("End group Done");
   }
 
@@ -414,30 +414,30 @@
     }
     return true;
   }
-  
+
   // This close() function does not need to be synchronized
   // since it is called by its parents' main thread, so no
   // more than 1 thread should call this close() function.
   public void close(boolean abort) throws HiveException {
 
-    if (state == State.CLOSE) 
+    if (state == State.CLOSE)
       return;
 
     // check if all parents are finished
-    if (!allInitializedParentsAreClosed()) 
+    if (!allInitializedParentsAreClosed())
       return;
-    
+
     // set state as CLOSE as long as all parents are closed
     // state == CLOSE doesn't mean all children are also in state CLOSE
     state = State.CLOSE;
     LOG.info(id + " finished. closing... ");
-    
+
     if (counterNameToEnum != null) {
       incrCounter(numInputRowsCntr, inputRows);
       incrCounter(numOutputRowsCntr, outputRows);
       incrCounter(timeTakenCntr, totalTime);
     }
-    
+
     LOG.info(id + " forwarded " + cntr + " rows");
 
     // call the operator specific close routine
@@ -458,7 +458,7 @@
       throw e;
     }
   }
-  
+
   /**
    * Operator specific close routine. Operators which inherents this
    * class should overwrite this funtion for their specific cleanup
@@ -478,7 +478,7 @@
   public void jobClose(Configuration conf, boolean success) throws HiveException {
     if(childOperators == null)
       return;
-    
+
     for(Operator<? extends Serializable> op: childOperators) {
       op.jobClose(conf, success);
     }
@@ -489,7 +489,7 @@
    *  per row, so it's important to make the access efficient.
    */
   transient protected Operator<? extends Serializable>[] childOperatorsArray = null;
-  transient protected int[] childOperatorsTag; 
+  transient protected int[] childOperatorsTag;
 
   // counters for debugging
   transient private long cntr = 0;
@@ -509,11 +509,11 @@
   public void  removeChild(Operator<? extends Serializable> child) {
     int childIndex = childOperators.indexOf(child);
     assert childIndex != -1;
-    if (childOperators.size() == 1) 
+    if (childOperators.size() == 1)
       childOperators = null;
     else
       childOperators.remove(childIndex);
-    
+
     int parentIndex = child.getParentOperators().indexOf(this);
     assert parentIndex != -1;
     if (child.getParentOperators().size() == 1)
@@ -538,19 +538,19 @@
     // every 1 million times, and quickly before that
     if (cntr >= 1000000)
       return cntr + 1000000;
-    
+
     return 10 * cntr;
   }
 
   protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
-    
+
     if ((++outputRows % 1000) == 0) {
       if (counterNameToEnum != null) {
         incrCounter(numOutputRowsCntr, outputRows);
         outputRows = 0;
       }
     }
-    
+
     if (LOG.isInfoEnabled()) {
       cntr++;
       if (cntr == nextCntr) {
@@ -562,15 +562,15 @@
     // For debugging purposes:
     // System.out.println("" + this.getClass() + ": " + SerDeUtils.getJSONString(row, rowInspector));
     // System.out.println("" + this.getClass() + ">> " + ObjectInspectorUtils.getObjectInspectorName(rowInspector));
- 
+
     if (childOperatorsArray == null && childOperators != null) {
       throw new HiveException("Internal Hive error during operator initialization.");
     }
-    
+
     if((childOperatorsArray == null) || (getDone())) {
       return;
     }
-    
+
     int childrenDone = 0;
     for (int i = 0; i < childOperatorsArray.length; i++) {
       Operator<? extends Serializable> o = childOperatorsArray[i];
@@ -580,7 +580,7 @@
         o.process(row, childOperatorsTag[i]);
       }
     }
-    
+
     // if all children are done, this operator is also done
     if (childrenDone == childOperatorsArray.length) {
       setDone(true);
@@ -609,7 +609,7 @@
   public void logStats () {
     for(Enum<?> e: statsMap.keySet()) {
       LOG.info(e.toString() + ":" + statsMap.get(e).toString());
-    }    
+    }
   }
 
   /**
@@ -632,7 +632,7 @@
   public void setColumnExprMap(Map<String, exprNodeDesc> colExprMap) {
     this.colExprMap = colExprMap;
   }
-  
+
   private String getLevelString(int level) {
     if (level == 0) {
       return "\n";
@@ -645,22 +645,22 @@
     }
     return s.toString();
   }
-  
+
   public String dump(int level) {
     return dump(level, new HashSet<Integer>());
   }
-  
+
   public String dump(int level, HashSet<Integer> seenOpts) {
     if ( seenOpts.contains(new Integer(id)))
       return null;
     seenOpts.add(new Integer(id));
-    
+
     StringBuilder s = new StringBuilder();
     String ls = getLevelString(level);
     s.append(ls);
     s.append("<" + getName() + ">");
     s.append("Id =" + id);
-    
+
     if (childOperators != null) {
       s.append(ls);
       s.append("  <Children>");
@@ -685,12 +685,12 @@
     s.append("<\\" + getName() + ">");
     return s.toString();
   }
-  
+
   /**
    * Initialize an array of ExprNodeEvaluator and return the result
    * ObjectInspectors.
-   */  
-  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals, 
+   */
+  protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
       ObjectInspector rowInspector) throws HiveException {
     ObjectInspector[] result = new ObjectInspector[evals.length];
     for (int i=0; i<evals.length; i++) {
@@ -700,55 +700,55 @@
   }
 
   /**
-   * Initialize an array of ExprNodeEvaluator and put the return values into a 
+   * Initialize an array of ExprNodeEvaluator and put the return values into a
    * StructObjectInspector with integer field names.
-   */  
+   */
   protected static StructObjectInspector initEvaluatorsAndReturnStruct(
-      ExprNodeEvaluator[] evals, List<String> outputColName, ObjectInspector rowInspector) 
+      ExprNodeEvaluator[] evals, List<String> outputColName, ObjectInspector rowInspector)
       throws HiveException {
     ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, rowInspector);
     return ObjectInspectorFactory.getStandardStructObjectInspector(
         outputColName,
         Arrays.asList(fieldObjectInspectors));
   }
-  
+
   /**
    * All counter stuff below this
    */
-  
+
   /**
    * TODO This is a hack for hadoop 0.17 which only supports enum counters
    */
-  public static enum ProgressCounter { 
-    C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16, 
-    C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, 
-    C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, 
-    C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, 
-    C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80, 
-    C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96, 
-    C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112, 
+  public static enum ProgressCounter {
+    C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16,
+    C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32,
+    C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48,
+    C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64,
+    C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
+    C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96,
+    C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112,
     C113, C114, C115, C116, C117, C118, C119, C120, C121, C122, C123, C124, C125, C126, C127, C128
   };
 
   private static int totalNumCntrs = 128;
-  
+
   /**
    * populated at runtime from hadoop counters at run time in the client
    */
   transient protected Map<String, Long> counters;
-  
+
   /**
    * keeps track of unique ProgressCounter enums used
    * this value is used at compile time while assigning ProgressCounter
    * enums to counter names
    */
-  private static int lastEnumUsed;  
+  private static int lastEnumUsed;
 
   transient protected long inputRows = 0;
   transient protected long outputRows = 0;
   transient protected long beginTime = 0;
   transient protected long totalTime = 0;
-  
+
   /**
    * this is called before operator process to buffer some counters
    */
@@ -776,7 +776,7 @@
       totalTime += (System.currentTimeMillis() - beginTime);
   }
 
-  
+
   /**
    * this is called in operators in map or reduce tasks
    * @param name
@@ -813,11 +813,11 @@
   public void setOperatorId(String operatorId) {
     this.operatorId = operatorId;
   }
-  
+
   public Map<String, Long> getCounters() {
     return counters;
   }
-  
+
   /**
    * called in ExecDriver.progress periodically
    * @param ctrs counters from the running job
@@ -861,14 +861,14 @@
     counterNameToEnum = new HashMap<String, ProgressCounter>();
     for (String counterName: getCounterNames()) {
       ++lastEnumUsed;
-      
+
       // TODO Hack for hadoop-0.17
       // Currently, only maximum number of 'totalNumCntrs' can be used. If you want
       // to add more counters, increase the number of counters in ProgressCounter
       if (lastEnumUsed > totalNumCntrs) {
         LOG.warn("Using too many counters. Increase the total number of counters");
         return;
-      }      
+      }
       String enumName = "C" + lastEnumUsed;
       ProgressCounter ctr = ProgressCounter.valueOf(enumName);
       counterNameToEnum.put(counterName, ctr);
@@ -878,7 +878,7 @@
   protected static String numInputRowsCntr  = "NUM_INPUT_ROWS";
   protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
   protected static String timeTakenCntr     = "TIME_TAKEN";
-  
+
   public void initializeCounters() {
     initOperatorId();
     counterNames = new ArrayList<String>();
@@ -906,11 +906,11 @@
   public void setCounterNameToEnum(HashMap<String, ProgressCounter> counterNameToEnum) {
     this.counterNameToEnum = counterNameToEnum;
   }
-  
+
    /**
    * Should be overridden to return the type of the specific operator among
     * the types in OperatorType
-    * 
+    *
     * @return OperatorType.* or -1 if not overridden
     */
    public int getType() {