You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2009/12/15 01:32:04 UTC
svn commit: r890582 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
Author: heyongqiang
Date: Tue Dec 15 00:32:04 2009
New Revision: 890582
URL: http://svn.apache.org/viewvc?rev=890582&view=rev
Log:
Hive-991 union with 200 kids fail
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=890582&r1=890581&r2=890582&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Dec 15 00:32:04 2009
@@ -321,6 +321,9 @@
HIVE-973. Support nested types in custom scripts. (Namit Jain via zshao)
+ HIVE-991 union with 200 kids fail
+ (namit via He Yongqiang)
+
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=890582&r1=890581&r2=890582&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Dec 15 00:32:04 2009
@@ -50,7 +50,7 @@
// Bean methods
private static final long serialVersionUID = 1L;
-
+
protected List<Operator<? extends Serializable>> childOperators;
protected List<Operator<? extends Serializable>> parentOperators;
protected String operatorId;
@@ -63,28 +63,28 @@
* Individual operators can add to this list via addToCounterNames methods
*/
protected ArrayList<String> counterNames;
-
+
/**
* Each operator has its own map of its counter names to disjoint
* ProgressCounter - it is populated at compile time and is read in
* at run-time while extracting the operator specific counts
*/
protected HashMap<String, ProgressCounter> counterNameToEnum;
-
+
private static int seqId;
-
+
// It can be optimized later so that an operator operator (init/close) is performed
// only after that operation has been performed on all the parents. This will require
// initializing the whole tree in all the mappers (which might be required for mappers
// spanning multiple files anyway, in future)
- public static enum State {
+ public static enum State {
UNINIT, // initialize() has not been called
INIT, // initialize() has been called and close() has not been called,
// or close() has been called but one of its parent is not closed.
- CLOSE // all its parents operators are in state CLOSE and called close()
+ CLOSE // all its parents operators are in state CLOSE and called close()
// to children. Note: close() being called and its state being CLOSE is
- // difference since close() could be called but state is not CLOSE if
+ // difference since close() could be called but state is not CLOSE if
// one of its parent is not in state CLOSE..
};
transient protected State state = State.UNINIT;
@@ -92,11 +92,11 @@
static {
seqId = 0;
}
-
+
public Operator() {
id = String.valueOf(seqId++);
}
-
+
/**
* Create an operator with a reporter.
* @param reporter Used to report progress of certain operators.
@@ -118,19 +118,19 @@
* Implements the getChildren function for the Node Interface.
*/
public Vector<Node> getChildren() {
-
+
if (getChildOperators() == null) {
return null;
}
-
+
Vector<Node> ret_vec = new Vector<Node>();
for(Operator<? extends Serializable> op: getChildOperators()) {
ret_vec.add(op);
}
-
+
return ret_vec;
}
-
+
public void setParentOperators(List<Operator<? extends Serializable>> parentOperators) {
this.parentOperators = parentOperators;
}
@@ -138,7 +138,7 @@
public List<Operator<? extends Serializable>> getParentOperators() {
return parentOperators;
}
-
+
protected T conf;
protected boolean done;
@@ -169,7 +169,7 @@
public RowSchema getSchema() {
return rowSchema;
}
-
+
// non-bean ..
transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable> ();
@@ -179,9 +179,9 @@
transient protected Reporter reporter;
transient protected String id;
// object inspectors for input rows
- transient protected ObjectInspector[] inputObjInspectors = new ObjectInspector[Byte.MAX_VALUE];
+ transient protected ObjectInspector[] inputObjInspectors = new ObjectInspector[Short.MAX_VALUE];
// for output rows of this operator
- transient protected ObjectInspector outputObjInspector;
+ transient protected ObjectInspector outputObjInspector;
/**
* A map of output column name to input expression map. This is used by optimizer
@@ -193,7 +193,7 @@
public void setId(String id) {
this.id = id;
}
-
+
/**
* This function is not named getId(), to make sure java serialization
* does NOT serialize it. Some TestParse tests will fail if we serialize
@@ -201,7 +201,7 @@
* query tests.
*/
public String getIdentifier() { return id; }
-
+
public void setReporter(Reporter rep) {
reporter = rep;
@@ -213,7 +213,7 @@
op.setReporter(rep);
}
}
-
+
public void setOutputCollector(OutputCollector out) {
this.out = out;
@@ -267,7 +267,7 @@
/**
* Initializes operators only if all parents have been initialized.
* Calls operator specific initializer which then initializes child ops.
- *
+ *
* @param hconf
* @param inputOIs input object inspector array indexes by tag id. null value is ignored.
* @throws HiveException
@@ -281,24 +281,24 @@
return;
}
LOG.info("Initializing Self " + id + " " + getName());
-
+
if (inputOIs != null) {
inputObjInspectors = inputOIs;
}
-
+
// initialize structure to maintain child op info. operator tree changes while
// initializing so this need to be done here instead of initialize() method
if (childOperators != null) {
childOperatorsArray = new Operator[childOperators.size()];
for (int i=0; i<childOperatorsArray.length; i++) {
- childOperatorsArray[i] = childOperators.get(i);
+ childOperatorsArray[i] = childOperators.get(i);
}
childOperatorsTag = new int[childOperatorsArray.length];
for (int i=0; i<childOperatorsArray.length; i++) {
- List<Operator<? extends Serializable>> parentOperators =
+ List<Operator<? extends Serializable>> parentOperators =
childOperatorsArray[i].getParentOperators();
if (parentOperators == null) {
- throw new HiveException("Hive internal error: parent is null in "
+ throw new HiveException("Hive internal error: parent is null in "
+ childOperatorsArray[i].getClass() + "!");
}
childOperatorsTag[i] = parentOperators.indexOf(this);
@@ -307,13 +307,13 @@
}
}
}
-
+
if (inputObjInspectors.length == 0) {
throw new HiveException("Internal Error during operator initialization.");
}
// derived classes can set this to different object if needed
outputObjInspector = inputObjInspectors[0];
-
+
initializeOp(hconf);
LOG.info("Initialization Done " + id + " " + getName());
}
@@ -324,7 +324,7 @@
protected void initializeOp(Configuration hconf) throws HiveException {
initializeChildren(hconf);
}
-
+
/**
* Calls initialize on each of the children with outputObjetInspector as the output row format
*/
@@ -351,10 +351,10 @@
LOG.info("Initializing child " + id + " " + getName());
inputObjInspectors[parentId] = inputOI;
// call the actual operator initialization function
- initialize(hconf, null);
+ initialize(hconf, null);
}
-
+
/**
* Process the row.
* @param row The object representing the row.
@@ -362,7 +362,7 @@
* Rows with the same tag should have exactly the same rowInspector all the time.
*/
public abstract void processOp(Object row, int tag) throws HiveException;
-
+
/**
* Process the row.
* @param row The object representing the row.
@@ -374,32 +374,32 @@
processOp(row, tag);
postProcessCounter();
}
-
+
// If a operator wants to do some work at the beginning of a group
public void startGroup() throws HiveException {
LOG.debug("Starting group");
-
+
if (childOperators == null)
return;
-
+
LOG.debug("Starting group for children:");
for (Operator<? extends Serializable> op: childOperators)
op.startGroup();
-
+
LOG.debug("Start group Done");
- }
-
+ }
+
// If a operator wants to do some work at the end of a group
public void endGroup() throws HiveException {
LOG.debug("Ending group");
-
+
if (childOperators == null)
return;
-
+
LOG.debug("Ending group for children:");
for (Operator<? extends Serializable> op: childOperators)
op.endGroup();
-
+
LOG.debug("End group Done");
}
@@ -414,30 +414,30 @@
}
return true;
}
-
+
// This close() function does not need to be synchronized
// since it is called by its parents' main thread, so no
// more than 1 thread should call this close() function.
public void close(boolean abort) throws HiveException {
- if (state == State.CLOSE)
+ if (state == State.CLOSE)
return;
// check if all parents are finished
- if (!allInitializedParentsAreClosed())
+ if (!allInitializedParentsAreClosed())
return;
-
+
// set state as CLOSE as long as all parents are closed
// state == CLOSE doesn't mean all children are also in state CLOSE
state = State.CLOSE;
LOG.info(id + " finished. closing... ");
-
+
if (counterNameToEnum != null) {
incrCounter(numInputRowsCntr, inputRows);
incrCounter(numOutputRowsCntr, outputRows);
incrCounter(timeTakenCntr, totalTime);
}
-
+
LOG.info(id + " forwarded " + cntr + " rows");
// call the operator specific close routine
@@ -458,7 +458,7 @@
throw e;
}
}
-
+
/**
* Operator specific close routine. Operators which inherents this
* class should overwrite this funtion for their specific cleanup
@@ -478,7 +478,7 @@
public void jobClose(Configuration conf, boolean success) throws HiveException {
if(childOperators == null)
return;
-
+
for(Operator<? extends Serializable> op: childOperators) {
op.jobClose(conf, success);
}
@@ -489,7 +489,7 @@
* per row, so it's important to make the access efficient.
*/
transient protected Operator<? extends Serializable>[] childOperatorsArray = null;
- transient protected int[] childOperatorsTag;
+ transient protected int[] childOperatorsTag;
// counters for debugging
transient private long cntr = 0;
@@ -509,11 +509,11 @@
public void removeChild(Operator<? extends Serializable> child) {
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
- if (childOperators.size() == 1)
+ if (childOperators.size() == 1)
childOperators = null;
else
childOperators.remove(childIndex);
-
+
int parentIndex = child.getParentOperators().indexOf(this);
assert parentIndex != -1;
if (child.getParentOperators().size() == 1)
@@ -538,19 +538,19 @@
// every 1 million times, and quickly before that
if (cntr >= 1000000)
return cntr + 1000000;
-
+
return 10 * cntr;
}
protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
-
+
if ((++outputRows % 1000) == 0) {
if (counterNameToEnum != null) {
incrCounter(numOutputRowsCntr, outputRows);
outputRows = 0;
}
}
-
+
if (LOG.isInfoEnabled()) {
cntr++;
if (cntr == nextCntr) {
@@ -562,15 +562,15 @@
// For debugging purposes:
// System.out.println("" + this.getClass() + ": " + SerDeUtils.getJSONString(row, rowInspector));
// System.out.println("" + this.getClass() + ">> " + ObjectInspectorUtils.getObjectInspectorName(rowInspector));
-
+
if (childOperatorsArray == null && childOperators != null) {
throw new HiveException("Internal Hive error during operator initialization.");
}
-
+
if((childOperatorsArray == null) || (getDone())) {
return;
}
-
+
int childrenDone = 0;
for (int i = 0; i < childOperatorsArray.length; i++) {
Operator<? extends Serializable> o = childOperatorsArray[i];
@@ -580,7 +580,7 @@
o.process(row, childOperatorsTag[i]);
}
}
-
+
// if all children are done, this operator is also done
if (childrenDone == childOperatorsArray.length) {
setDone(true);
@@ -609,7 +609,7 @@
public void logStats () {
for(Enum<?> e: statsMap.keySet()) {
LOG.info(e.toString() + ":" + statsMap.get(e).toString());
- }
+ }
}
/**
@@ -632,7 +632,7 @@
public void setColumnExprMap(Map<String, exprNodeDesc> colExprMap) {
this.colExprMap = colExprMap;
}
-
+
private String getLevelString(int level) {
if (level == 0) {
return "\n";
@@ -645,22 +645,22 @@
}
return s.toString();
}
-
+
public String dump(int level) {
return dump(level, new HashSet<Integer>());
}
-
+
public String dump(int level, HashSet<Integer> seenOpts) {
if ( seenOpts.contains(new Integer(id)))
return null;
seenOpts.add(new Integer(id));
-
+
StringBuilder s = new StringBuilder();
String ls = getLevelString(level);
s.append(ls);
s.append("<" + getName() + ">");
s.append("Id =" + id);
-
+
if (childOperators != null) {
s.append(ls);
s.append(" <Children>");
@@ -685,12 +685,12 @@
s.append("<\\" + getName() + ">");
return s.toString();
}
-
+
/**
* Initialize an array of ExprNodeEvaluator and return the result
* ObjectInspectors.
- */
- protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
+ */
+ protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
ObjectInspector rowInspector) throws HiveException {
ObjectInspector[] result = new ObjectInspector[evals.length];
for (int i=0; i<evals.length; i++) {
@@ -700,55 +700,55 @@
}
/**
- * Initialize an array of ExprNodeEvaluator and put the return values into a
+ * Initialize an array of ExprNodeEvaluator and put the return values into a
* StructObjectInspector with integer field names.
- */
+ */
protected static StructObjectInspector initEvaluatorsAndReturnStruct(
- ExprNodeEvaluator[] evals, List<String> outputColName, ObjectInspector rowInspector)
+ ExprNodeEvaluator[] evals, List<String> outputColName, ObjectInspector rowInspector)
throws HiveException {
ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, rowInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
outputColName,
Arrays.asList(fieldObjectInspectors));
}
-
+
/**
* All counter stuff below this
*/
-
+
/**
* TODO This is a hack for hadoop 0.17 which only supports enum counters
*/
- public static enum ProgressCounter {
- C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16,
- C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32,
- C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48,
- C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64,
- C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
- C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96,
- C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112,
+ public static enum ProgressCounter {
+ C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16,
+ C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32,
+ C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48,
+ C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64,
+ C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
+ C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96,
+ C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112,
C113, C114, C115, C116, C117, C118, C119, C120, C121, C122, C123, C124, C125, C126, C127, C128
};
private static int totalNumCntrs = 128;
-
+
/**
* populated at runtime from hadoop counters at run time in the client
*/
transient protected Map<String, Long> counters;
-
+
/**
* keeps track of unique ProgressCounter enums used
* this value is used at compile time while assigning ProgressCounter
* enums to counter names
*/
- private static int lastEnumUsed;
+ private static int lastEnumUsed;
transient protected long inputRows = 0;
transient protected long outputRows = 0;
transient protected long beginTime = 0;
transient protected long totalTime = 0;
-
+
/**
* this is called before operator process to buffer some counters
*/
@@ -776,7 +776,7 @@
totalTime += (System.currentTimeMillis() - beginTime);
}
-
+
/**
* this is called in operators in map or reduce tasks
* @param name
@@ -813,11 +813,11 @@
public void setOperatorId(String operatorId) {
this.operatorId = operatorId;
}
-
+
public Map<String, Long> getCounters() {
return counters;
}
-
+
/**
* called in ExecDriver.progress periodically
* @param ctrs counters from the running job
@@ -861,14 +861,14 @@
counterNameToEnum = new HashMap<String, ProgressCounter>();
for (String counterName: getCounterNames()) {
++lastEnumUsed;
-
+
// TODO Hack for hadoop-0.17
// Currently, only maximum number of 'totalNumCntrs' can be used. If you want
// to add more counters, increase the number of counters in ProgressCounter
if (lastEnumUsed > totalNumCntrs) {
LOG.warn("Using too many counters. Increase the total number of counters");
return;
- }
+ }
String enumName = "C" + lastEnumUsed;
ProgressCounter ctr = ProgressCounter.valueOf(enumName);
counterNameToEnum.put(counterName, ctr);
@@ -878,7 +878,7 @@
protected static String numInputRowsCntr = "NUM_INPUT_ROWS";
protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
protected static String timeTakenCntr = "TIME_TAKEN";
-
+
public void initializeCounters() {
initOperatorId();
counterNames = new ArrayList<String>();
@@ -906,11 +906,11 @@
public void setCounterNameToEnum(HashMap<String, ProgressCounter> counterNameToEnum) {
this.counterNameToEnum = counterNameToEnum;
}
-
+
/**
* Should be overridden to return the type of the specific operator among
* the types in OperatorType
- *
+ *
* @return OperatorType.* or -1 if not overridden
*/
public int getType() {