You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/26 09:19:34 UTC
svn commit: r1545564 [3/22] - in /hive/branches/tez: ./ ant/ beeline/ bin/
cli/ common/ common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/common/type/
common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/...
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Nov 26 08:19:25 2013
@@ -46,7 +46,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
@@ -59,24 +59,13 @@ public abstract class Operator<T extends
private static final long serialVersionUID = 1L;
+ public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
+ public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
+
private transient Configuration configuration;
protected List<Operator<? extends OperatorDesc>> childOperators;
protected List<Operator<? extends OperatorDesc>> parentOperators;
protected String operatorId;
- /**
- * List of counter names associated with the operator. It contains the
- * following default counters NUM_INPUT_ROWS NUM_OUTPUT_ROWS TIME_TAKEN
- * Individual operators can add to this list via addToCounterNames methods.
- */
- protected ArrayList<String> counterNames;
-
- /**
- * Each operator has its own map of its counter names to disjoint
- * ProgressCounter - it is populated at compile time and is read in at
- * run-time while extracting the operator specific counts.
- */
- protected HashMap<String, ProgressCounter> counterNameToEnum;
-
private transient ExecMapperContext execContext;
private static AtomicInteger seqId;
@@ -98,13 +87,10 @@ public abstract class Operator<T extends
// to children. Note: close() being called and its state being CLOSE is
// difference since close() could be called but state is not CLOSE if
// one of its parent is not in state CLOSE..
- };
+ }
protected transient State state = State.UNINIT;
- protected static transient boolean fatalError = false; // fatalError is shared acorss
- // all operators
-
static {
seqId = new AtomicInteger(0);
}
@@ -115,6 +101,7 @@ public abstract class Operator<T extends
id = String.valueOf(seqId.getAndIncrement());
childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+ initOperatorId();
}
public static void resetId() {
@@ -197,10 +184,10 @@ public abstract class Operator<T extends
}
public boolean getDone() {
- return done || fatalError;
+ return done;
}
- public void setDone(boolean done) {
+ protected final void setDone(boolean done) {
this.done = done;
}
@@ -218,6 +205,8 @@ public abstract class Operator<T extends
// non-bean ..
protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
+ @SuppressWarnings("rawtypes")
+ protected transient OutputCollector out;
protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled();
protected transient String alias;
@@ -263,6 +252,20 @@ public abstract class Operator<T extends
}
}
+ @SuppressWarnings("rawtypes")
+ public void setOutputCollector(OutputCollector out) {
+ this.out = out;
+
+ // the collector is same across all operators
+ if (childOperators == null) {
+ return;
+ }
+
+ for (Operator<? extends OperatorDesc> op : childOperators) {
+ op.setOutputCollector(out);
+ }
+ }
+
/**
* Store the alias this operator is working on behalf of.
*/
@@ -318,6 +321,7 @@ public abstract class Operator<T extends
* ignored.
* @throws HiveException
*/
+ @SuppressWarnings("unchecked")
public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
throws HiveException {
if (state == State.INIT) {
@@ -475,38 +479,6 @@ public abstract class Operator<T extends
*/
public abstract void processOp(Object row, int tag) throws HiveException;
- /**
- * Process the row.
- *
- * @param row
- * The object representing the row.
- * @param tag
- * The tag of the row usually means which parent this row comes from.
- * Rows with the same tag should have exactly the same rowInspector
- * all the time.
- */
- public void process(Object row, int tag) throws HiveException {
- if (fatalError) {
- return;
- }
-
- if (counterNameToEnum != null) {
- inputRows++;
- if ((inputRows % 1000) == 0) {
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(timeTakenCntr, totalTime);
- inputRows = 0;
- totalTime = 0;
- }
-
- beginTime = System.currentTimeMillis();
- processOp(row, tag);
- totalTime += (System.currentTimeMillis() - beginTime);
- } else {
- processOp(row, tag);
- }
- }
-
protected final void defaultStartGroup() throws HiveException {
LOG.debug("Starting group");
@@ -514,10 +486,6 @@ public abstract class Operator<T extends
return;
}
- if (fatalError) {
- return;
- }
-
LOG.debug("Starting group for children:");
for (Operator<? extends OperatorDesc> op : childOperators) {
op.startGroup();
@@ -533,10 +501,6 @@ public abstract class Operator<T extends
return;
}
- if (fatalError) {
- return;
- }
-
LOG.debug("Ending group for children:");
for (Operator<? extends OperatorDesc> op : childOperators) {
op.endGroup();
@@ -607,14 +571,6 @@ public abstract class Operator<T extends
reporter = null;
- if (counterNameToEnum != null) {
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(numOutputRowsCntr, outputRows);
- incrCounter(timeTakenCntr, totalTime);
- }
-
- LOG.info(id + " forwarded " + cntr + " rows");
-
try {
logStats();
if (childOperators == null) {
@@ -679,10 +635,6 @@ public abstract class Operator<T extends
protected transient Operator<? extends OperatorDesc>[] childOperatorsArray = null;
protected transient int[] childOperatorsTag;
- // counters for debugging
- private transient long cntr = 0;
- private transient long nextCntr = 1;
-
/**
* Replace one child with another at the same position. The parent of the
* child is not changed
@@ -821,21 +773,6 @@ public abstract class Operator<T extends
protected void forward(Object row, ObjectInspector rowInspector)
throws HiveException {
- if (counterNameToEnum != null) {
- if ((++outputRows % 1000) == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
- }
- }
-
- increaseForward(1);
-
- // For debugging purposes:
- // System.out.println("" + this.getClass() + ": " +
- // SerDeUtils.getJSONString(row, rowInspector));
- // System.out.println("" + this.getClass() + ">> " +
- // ObjectInspectorUtils.getObjectInspectorName(rowInspector));
-
if ((childOperatorsArray == null) || (getDone())) {
return;
}
@@ -846,7 +783,7 @@ public abstract class Operator<T extends
if (o.getDone()) {
childrenDone++;
} else {
- o.process(row, childOperatorsTag[i]);
+ o.processOp(row, childOperatorsTag[i]);
}
}
@@ -856,18 +793,6 @@ public abstract class Operator<T extends
}
}
- void increaseForward(long counter) {
- if (isLogInfoEnabled) {
- cntr += counter;
- if (cntr >= nextCntr) {
- LOG.info(id + " forwarding " + cntr + " rows");
- do {
- nextCntr = getNextCntr(nextCntr);
- } while(cntr >= nextCntr);
- }
- }
- }
-
public void resetStats() {
for (Enum<?> e : statsMap.keySet()) {
statsMap.get(e).set(0L);
@@ -1028,174 +953,12 @@ public abstract class Operator<T extends
outputColName, Arrays.asList(fieldObjectInspectors));
}
- /**
- * All counter stuff below this
- */
-
- /**
- * TODO This is a hack for hadoop 0.17 which only supports enum counters.
- */
- public static enum ProgressCounter {
- CREATED_FILES,
- C1, C2, C3, C4, C5, C6, C7, C8, C9, C10,
- C11, C12, C13, C14, C15, C16, C17, C18, C19, C20,
- C21, C22, C23, C24, C25, C26, C27, C28, C29, C30,
- C31, C32, C33, C34, C35, C36, C37, C38, C39, C40,
- C41, C42, C43, C44, C45, C46, C47, C48, C49, C50,
- C51, C52, C53, C54, C55, C56, C57, C58, C59, C60,
- C61, C62, C63, C64, C65, C66, C67, C68, C69, C70,
- C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
- C81, C82, C83, C84, C85, C86, C87, C88, C89, C90,
- C91, C92, C93, C94, C95, C96, C97, C98, C99, C100,
- C101, C102, C103, C104, C105, C106, C107, C108, C109, C110,
- C111, C112, C113, C114, C115, C116, C117, C118, C119, C120,
- C121, C122, C123, C124, C125, C126, C127, C128, C129, C130,
- C131, C132, C133, C134, C135, C136, C137, C138, C139, C140,
- C141, C142, C143, C144, C145, C146, C147, C148, C149, C150,
- C151, C152, C153, C154, C155, C156, C157, C158, C159, C160,
- C161, C162, C163, C164, C165, C166, C167, C168, C169, C170,
- C171, C172, C173, C174, C175, C176, C177, C178, C179, C180,
- C181, C182, C183, C184, C185, C186, C187, C188, C189, C190,
- C191, C192, C193, C194, C195, C196, C197, C198, C199, C200,
- C201, C202, C203, C204, C205, C206, C207, C208, C209, C210,
- C211, C212, C213, C214, C215, C216, C217, C218, C219, C220,
- C221, C222, C223, C224, C225, C226, C227, C228, C229, C230,
- C231, C232, C233, C234, C235, C236, C237, C238, C239, C240,
- C241, C242, C243, C244, C245, C246, C247, C248, C249, C250,
- C251, C252, C253, C254, C255, C256, C257, C258, C259, C260,
- C261, C262, C263, C264, C265, C266, C267, C268, C269, C270,
- C271, C272, C273, C274, C275, C276, C277, C278, C279, C280,
- C281, C282, C283, C284, C285, C286, C287, C288, C289, C290,
- C291, C292, C293, C294, C295, C296, C297, C298, C299, C300,
- C301, C302, C303, C304, C305, C306, C307, C308, C309, C310,
- C311, C312, C313, C314, C315, C316, C317, C318, C319, C320,
- C321, C322, C323, C324, C325, C326, C327, C328, C329, C330,
- C331, C332, C333, C334, C335, C336, C337, C338, C339, C340,
- C341, C342, C343, C344, C345, C346, C347, C348, C349, C350,
- C351, C352, C353, C354, C355, C356, C357, C358, C359, C360,
- C361, C362, C363, C364, C365, C366, C367, C368, C369, C370,
- C371, C372, C373, C374, C375, C376, C377, C378, C379, C380,
- C381, C382, C383, C384, C385, C386, C387, C388, C389, C390,
- C391, C392, C393, C394, C395, C396, C397, C398, C399, C400,
- C401, C402, C403, C404, C405, C406, C407, C408, C409, C410,
- C411, C412, C413, C414, C415, C416, C417, C418, C419, C420,
- C421, C422, C423, C424, C425, C426, C427, C428, C429, C430,
- C431, C432, C433, C434, C435, C436, C437, C438, C439, C440,
- C441, C442, C443, C444, C445, C446, C447, C448, C449, C450,
- C451, C452, C453, C454, C455, C456, C457, C458, C459, C460,
- C461, C462, C463, C464, C465, C466, C467, C468, C469, C470,
- C471, C472, C473, C474, C475, C476, C477, C478, C479, C480,
- C481, C482, C483, C484, C485, C486, C487, C488, C489, C490,
- C491, C492, C493, C494, C495, C496, C497, C498, C499, C500,
- C501, C502, C503, C504, C505, C506, C507, C508, C509, C510,
- C511, C512, C513, C514, C515, C516, C517, C518, C519, C520,
- C521, C522, C523, C524, C525, C526, C527, C528, C529, C530,
- C531, C532, C533, C534, C535, C536, C537, C538, C539, C540,
- C541, C542, C543, C544, C545, C546, C547, C548, C549, C550,
- C551, C552, C553, C554, C555, C556, C557, C558, C559, C560,
- C561, C562, C563, C564, C565, C566, C567, C568, C569, C570,
- C571, C572, C573, C574, C575, C576, C577, C578, C579, C580,
- C581, C582, C583, C584, C585, C586, C587, C588, C589, C590,
- C591, C592, C593, C594, C595, C596, C597, C598, C599, C600,
- C601, C602, C603, C604, C605, C606, C607, C608, C609, C610,
- C611, C612, C613, C614, C615, C616, C617, C618, C619, C620,
- C621, C622, C623, C624, C625, C626, C627, C628, C629, C630,
- C631, C632, C633, C634, C635, C636, C637, C638, C639, C640,
- C641, C642, C643, C644, C645, C646, C647, C648, C649, C650,
- C651, C652, C653, C654, C655, C656, C657, C658, C659, C660,
- C661, C662, C663, C664, C665, C666, C667, C668, C669, C670,
- C671, C672, C673, C674, C675, C676, C677, C678, C679, C680,
- C681, C682, C683, C684, C685, C686, C687, C688, C689, C690,
- C691, C692, C693, C694, C695, C696, C697, C698, C699, C700,
- C701, C702, C703, C704, C705, C706, C707, C708, C709, C710,
- C711, C712, C713, C714, C715, C716, C717, C718, C719, C720,
- C721, C722, C723, C724, C725, C726, C727, C728, C729, C730,
- C731, C732, C733, C734, C735, C736, C737, C738, C739, C740,
- C741, C742, C743, C744, C745, C746, C747, C748, C749, C750,
- C751, C752, C753, C754, C755, C756, C757, C758, C759, C760,
- C761, C762, C763, C764, C765, C766, C767, C768, C769, C770,
- C771, C772, C773, C774, C775, C776, C777, C778, C779, C780,
- C781, C782, C783, C784, C785, C786, C787, C788, C789, C790,
- C791, C792, C793, C794, C795, C796, C797, C798, C799, C800,
- C801, C802, C803, C804, C805, C806, C807, C808, C809, C810,
- C811, C812, C813, C814, C815, C816, C817, C818, C819, C820,
- C821, C822, C823, C824, C825, C826, C827, C828, C829, C830,
- C831, C832, C833, C834, C835, C836, C837, C838, C839, C840,
- C841, C842, C843, C844, C845, C846, C847, C848, C849, C850,
- C851, C852, C853, C854, C855, C856, C857, C858, C859, C860,
- C861, C862, C863, C864, C865, C866, C867, C868, C869, C870,
- C871, C872, C873, C874, C875, C876, C877, C878, C879, C880,
- C881, C882, C883, C884, C885, C886, C887, C888, C889, C890,
- C891, C892, C893, C894, C895, C896, C897, C898, C899, C900,
- C901, C902, C903, C904, C905, C906, C907, C908, C909, C910,
- C911, C912, C913, C914, C915, C916, C917, C918, C919, C920,
- C921, C922, C923, C924, C925, C926, C927, C928, C929, C930,
- C931, C932, C933, C934, C935, C936, C937, C938, C939, C940,
- C941, C942, C943, C944, C945, C946, C947, C948, C949, C950,
- C951, C952, C953, C954, C955, C956, C957, C958, C959, C960,
- C961, C962, C963, C964, C965, C966, C967, C968, C969, C970,
- C971, C972, C973, C974, C975, C976, C977, C978, C979, C980,
- C981, C982, C983, C984, C985, C986, C987, C988, C989, C990,
- C991, C992, C993, C994, C995, C996, C997, C998, C999, C1000
- };
-
- private static int totalNumCntrs = 1000;
-
- /**
- * populated at runtime from hadoop counters at run time in the client.
- */
- protected transient HashMap<String, Long> counters;
-
- /**
- * keeps track of unique ProgressCounter enums used this value is used at
- * compile time while assigning ProgressCounter enums to counter names.
- */
- private static int lastEnumUsed;
-
- protected transient long inputRows = 0;
- protected transient long outputRows = 0;
- protected transient long beginTime = 0;
- protected transient long totalTime = 0;
-
protected transient Object groupKeyObject;
- /**
- * this is called in operators in map or reduce tasks.
- *
- * @param name
- * @param amount
- */
- protected void incrCounter(String name, long amount) {
- String counterName = getWrappedCounterName(name);
- ProgressCounter pc = counterNameToEnum.get(counterName);
-
- // Currently, we maintain fixed number of counters per plan - in case of a
- // bigger tree, we may run out of them
- if (pc == null) {
- LOG
- .warn("Using too many counters. Increase the total number of counters for "
- + counterName);
- } else if (reporter != null) {
- reporter.incrCounter(pc, amount);
- }
- }
-
- public ArrayList<String> getCounterNames() {
- return counterNames;
- }
-
- public void setCounterNames(ArrayList<String> counterNames) {
- this.counterNames = counterNames;
- }
-
public String getOperatorId() {
return operatorId;
}
- public final String getWrappedCounterName(String ctrName) {
- return String.format(counterNameFormat, getOperatorId(), ctrName);
- }
-
public void initOperatorId() {
setOperatorId(getName() + "_" + this.id);
}
@@ -1204,145 +967,6 @@ public abstract class Operator<T extends
this.operatorId = operatorId;
}
- public HashMap<String, Long> getCounters() {
- return counters;
- }
-
- /**
- * called in ExecDriver.progress periodically.
- *
- * @param ctrs
- * counters from the running job
- */
- @SuppressWarnings("unchecked")
- public void updateCounters(Counters ctrs) {
- if (counters == null) {
- counters = new HashMap<String, Long>();
- }
-
- // For some old unit tests, the counters will not be populated. Eventually,
- // the old tests should be removed
- if (counterNameToEnum == null) {
- return;
- }
-
- for (Map.Entry<String, ProgressCounter> counter : counterNameToEnum
- .entrySet()) {
- counters.put(counter.getKey(), ctrs.getCounter(counter.getValue()));
- }
- // update counters of child operators
- // this wont be an infinite loop since the operator graph is acyclic
- // but, some operators may be updated more than once and that's ok
- if (getChildren() != null) {
- for (Node op : getChildren()) {
- ((Operator<? extends OperatorDesc>) op).updateCounters(ctrs);
- }
- }
- }
-
- /**
- * Recursively check this operator and its descendants to see if the fatal
- * error counter is set to non-zero.
- *
- * @param ctrs
- */
- public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
- if (counterNameToEnum == null) {
- return false;
- }
-
- String counterName = getWrappedCounterName(fatalErrorCntr);
- ProgressCounter pc = counterNameToEnum.get(counterName);
-
- // Currently, we maintain fixed number of counters per plan - in case of a
- // bigger tree, we may run out of them
- if (pc == null) {
- LOG
- .warn("Using too many counters. Increase the total number of counters for "
- + counterName);
- } else {
- long value = ctrs.getCounter(pc);
- fatalErrorMessage(errMsg, value);
- if (value != 0) {
- return true;
- }
- }
-
- if (getChildren() != null) {
- for (Node op : getChildren()) {
- if (((Operator<? extends OperatorDesc>) op).checkFatalErrors(ctrs,
- errMsg)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Get the fatal error message based on counter's code.
- *
- * @param errMsg
- * error message should be appended to this output parameter.
- * @param counterValue
- * input counter code.
- */
- protected void fatalErrorMessage(StringBuilder errMsg, long counterValue) {
- }
-
- // A given query can have multiple map-reduce jobs
- public static void resetLastEnumUsed() {
- lastEnumUsed = 0;
- }
-
- /**
- * Called only in SemanticAnalyzer after all operators have added their own
- * set of counter names.
- */
- public void assignCounterNameToEnum() {
- if (counterNameToEnum != null) {
- return;
- }
- counterNameToEnum = new HashMap<String, ProgressCounter>();
- for (String counterName : getCounterNames()) {
- ++lastEnumUsed;
-
- // TODO Hack for hadoop-0.17
- // Currently, only maximum number of 'totalNumCntrs' can be used. If you
- // want
- // to add more counters, increase the number of counters in
- // ProgressCounter
- if (lastEnumUsed > totalNumCntrs) {
- LOG
- .warn("Using too many counters. Increase the total number of counters");
- return;
- }
- String enumName = "C" + lastEnumUsed;
- ProgressCounter ctr = ProgressCounter.valueOf(enumName);
- counterNameToEnum.put(counterName, ctr);
- }
- }
-
- protected static String numInputRowsCntr = "NUM_INPUT_ROWS";
- protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
- protected static String timeTakenCntr = "TIME_TAKEN";
- protected static String fatalErrorCntr = "FATAL_ERROR";
- private static String counterNameFormat = "CNTR_NAME_%s_%s";
-
- public void initializeCounters() {
- initOperatorId();
- counterNames = new ArrayList<String>();
- counterNames.add(getWrappedCounterName(numInputRowsCntr));
- counterNames.add(getWrappedCounterName(numOutputRowsCntr));
- counterNames.add(getWrappedCounterName(timeTakenCntr));
- counterNames.add(getWrappedCounterName(fatalErrorCntr));
- /* getAdditionalCounter should return Wrapped counters */
- List<String> newCntrs = getAdditionalCounters();
- if (newCntrs != null) {
- counterNames.addAll(newCntrs);
- }
- }
-
/*
* By default, the list is empty - if an operator wants to add more counters,
* it should override this method and provide the new list. Counter names returned
@@ -1353,15 +977,6 @@ public abstract class Operator<T extends
return null;
}
- public HashMap<String, ProgressCounter> getCounterNameToEnum() {
- return counterNameToEnum;
- }
-
- public void setCounterNameToEnum(
- HashMap<String, ProgressCounter> counterNameToEnum) {
- this.counterNameToEnum = counterNameToEnum;
- }
-
/**
* Return the type of the specific operator among the
* types in OperatorType.
@@ -1435,10 +1050,10 @@ public abstract class Operator<T extends
}
}
+ @SuppressWarnings("unchecked")
T descClone = (T)conf.clone();
Operator<? extends OperatorDesc> ret =
- (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
- descClone, getSchema(), parentClones);
+ OperatorFactory.getAndMakeChild(descClone, getSchema(), parentClones);
return ret;
}
@@ -1564,25 +1179,6 @@ public abstract class Operator<T extends
}
/**
- * Computes and retrieves the stats for this operator. Default implementation assumes same
- * input/output size for operator.
- *
- * @return Statistics for this operator
- */
- public Statistics getStatistics(HiveConf conf) throws HiveException {
- Statistics stats = this.getConf().getStatistics();
-
- if (stats == null) {
- stats = new Statistics();
- for (Operator<? extends OperatorDesc> parent: this.getParentOperators()) {
- stats.addNumberOfBytes(parent.getStatistics(conf).getNumberOfBytes());
- }
- this.getConf().setStatistics(stats);
- }
- return stats;
- }
-
- /**
* used for LimitPushdownOptimizer
*
* if all of the operators between limit and reduce-sink does not remove any input rows
@@ -1636,4 +1232,22 @@ public abstract class Operator<T extends
}
return false;
}
+
+ public Statistics getStatistics() {
+ if (conf != null) {
+ return conf.getStatistics();
+ }
+ return null;
+ }
+
+ public void setStatistics(Statistics stats) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting stats ("+stats+") on "+this);
+ }
+ if (conf != null) {
+ conf.setStatistics(stats);
+ } else {
+ LOG.warn("Cannot set stats when there's no descriptor: "+this);
+ }
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Nov 26 08:19:25 2013
@@ -137,7 +137,6 @@ public final class OperatorFactory {
Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor(
VectorizationContext.class, OperatorDesc.class).newInstance(
vContext, conf);
- op.initializeCounters();
return op;
} catch (Exception e) {
e.printStackTrace();
@@ -155,7 +154,6 @@ public final class OperatorFactory {
if (o.descClass == opClass) {
try {
Operator<T> op = (Operator<T>) o.opClass.newInstance();
- op.initializeCounters();
return op;
} catch (Exception e) {
e.printStackTrace();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Nov 26 08:19:25 2013
@@ -363,13 +363,6 @@ public class ReduceSinkOperator extends
if (null != out) {
out.collect(keyWritable, valueWritable);
}
- if (++outputRows % 1000 == 0) {
- if (counterNameToEnum != null) {
- incrCounter(numOutputRowsCntr, outputRows);
- }
- increaseForward(outputRows);
- outputRows = 0;
- }
}
private BytesWritable makeValueWritable(Object row) throws Exception {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Nov 26 08:19:25 2013
@@ -550,7 +550,7 @@ public class SMBMapJoinOperator extends
fetchDone[tag] = true;
return;
}
- forwardOp.process(row.o, tag);
+ forwardOp.processOp(row.o, tag);
// check if any operator had a fatal error or early exit during
// execution
if (forwardOp.getDone()) {
@@ -795,7 +795,7 @@ public class SMBMapJoinOperator extends
// Pass the row though the operator tree. It is guaranteed that not more than 1 row can
// be produced from a input row.
- forwardOp.process(nextRow.o, 0);
+ forwardOp.processOp(nextRow.o, 0);
nextRow = sinkOp.getResult();
// It is possible that the row got absorbed in the operator tree.
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Nov 26 08:19:25 2013
@@ -28,9 +28,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -178,11 +176,11 @@ public class StatsTask extends Task<Stat
if (!this.getWork().getNoStatsAggregator()) {
String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- StatsFactory.setImplementation(statsImplementationClass, conf);
- if (work.isNoScanAnalyzeCommand()){
+ StatsFactory factory = StatsFactory.newFactory(statsImplementationClass, conf);
+ if (factory != null && work.isNoScanAnalyzeCommand()){
// initialize stats publishing table for noscan which has only stats task
// the rest of MR task following stats task initializes it in ExecDriver.java
- StatsPublisher statsPublisher = StatsFactory.getStatsPublisher();
+ StatsPublisher statsPublisher = factory.getStatsPublisher();
if (!statsPublisher.init(conf)) { // creating stats table if not exists
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw
@@ -190,10 +188,12 @@ public class StatsTask extends Task<Stat
}
}
}
- statsAggregator = StatsFactory.getStatsAggregator();
- // manufacture a StatsAggregator
- if (!statsAggregator.connect(conf)) {
- throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+ if (factory != null) {
+ statsAggregator = factory.getStatsAggregator();
+ // manufacture a StatsAggregator
+ if (!statsAggregator.connect(conf, getWork().getSourceTask())) {
+ throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+ }
}
}
@@ -377,7 +377,7 @@ public class StatsTask extends Task<Stat
if (work.getLoadTableDesc() != null &&
!work.getLoadTableDesc().getReplace()) {
String originalValue = parameters.get(statType);
- if (originalValue != null) {
+ if (originalValue != null && !originalValue.equals("-1")) {
longValue += Long.parseLong(originalValue);
}
}
@@ -445,19 +445,4 @@ public class StatsTask extends Task<Stat
}
return list;
}
-
- /**
- * This method is static as it is called from the shutdown hook at the ExecDriver.
- */
- public static void cleanUp(String jobID, Configuration config) {
- StatsAggregator statsAggregator;
- String statsImplementationClass = HiveConf.getVar(config, HiveConf.ConfVars.HIVESTATSDBCLASS);
- StatsFactory.setImplementation(statsImplementationClass, config);
- statsAggregator = StatsFactory.getStatsAggregator();
- if (statsAggregator.connect(config)) {
- statsAggregator.cleanUp(jobID + Path.SEPARATOR); // Adding the path separator to avoid an Id
- // being a prefix of another ID
- statsAggregator.closeConnection();
- }
- }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Nov 26 08:19:25 2013
@@ -322,18 +322,6 @@ public class TableScanOperator extends O
}
@Override
- public Statistics getStatistics(HiveConf conf) throws HiveException {
- Statistics stats = this.getConf().getStatistics();
- if (stats == null) {
- stats = new Statistics();
- stats.addNumberOfBytes(Utilities.getSize(alias, getConf().getTable(), conf,
- this, getConf().getPruningPredicate()));
- this.getConf().setStatistics(stats);
- }
- return stats;
- }
-
- @Override
public boolean supportSkewJoinOptimization() {
return true;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 26 08:19:25 2013
@@ -837,7 +837,6 @@ public final class Utilities {
// workaround for java 1.5
e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
@@ -2155,12 +2154,6 @@ public final class Utilities {
HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf,
partDesc.getOverlayedProperties().getProperty(
hive_metastoreConstants.META_TABLE_STORAGE));
- if (handler == null) {
- // native table
- FileSystem fs = p.getFileSystem(myConf);
- resultMap.put(pathStr, fs.getContentSummary(p));
- return;
- }
if (handler instanceof InputEstimator) {
long total = 0;
TableDesc tableDesc = partDesc.getTableDesc();
@@ -2176,6 +2169,8 @@ public final class Utilities {
}
resultMap.put(pathStr, new ContentSummary(total, -1, -1));
}
+ FileSystem fs = p.getFileSystem(myConf);
+ resultMap.put(pathStr, fs.getContentSummary(p));
} catch (Exception e) {
// We safely ignore this exception for summary data.
// We don't update the cache to protect it from polluting other
@@ -2344,12 +2339,8 @@ public final class Utilities {
}
public static StatsPublisher getStatsPublisher(JobConf jc) {
- String statsImplementationClass = HiveConf.getVar(jc, HiveConf.ConfVars.HIVESTATSDBCLASS);
- if (StatsFactory.setImplementation(statsImplementationClass, jc)) {
- return StatsFactory.getStatsPublisher();
- } else {
- return null;
- }
+ StatsFactory factory = StatsFactory.newFactory(jc);
+ return factory == null ? null : factory.getStatsPublisher();
}
/**
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Nov 26 08:19:25 2013
@@ -82,7 +82,6 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.log4j.Appender;
@@ -185,17 +184,10 @@ public class ExecDriver extends Task<Map
* @return true if fatal errors happened during job execution, false otherwise.
*/
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
- for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
- if (op.checkFatalErrors(ctrs, errMsg)) {
- return true;
- }
- }
- if (work.getReduceWork() != null) {
- if (work.getReduceWork().getReducer().checkFatalErrors(ctrs, errMsg)) {
- return true;
- }
- }
- return false;
+ Counters.Counter cntr = ctrs.findCounter(
+ HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP),
+ Operator.HIVECOUNTERFATAL);
+ return cntr != null && cntr.getValue() > 0;
}
/**
@@ -414,9 +406,9 @@ public class ExecDriver extends Task<Map
if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) {
// initialize stats publishing table
StatsPublisher statsPublisher;
- String statsImplementationClass = HiveConf.getVar(job, HiveConf.ConfVars.HIVESTATSDBCLASS);
- if (StatsFactory.setImplementation(statsImplementationClass, job)) {
- statsPublisher = StatsFactory.getStatsPublisher();
+ StatsFactory factory = StatsFactory.newFactory(job);
+ if (factory != null) {
+ statsPublisher = factory.getStatsPublisher();
if (!statsPublisher.init(job)) { // creating stats table if not exists
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw
@@ -816,16 +808,6 @@ public class ExecDriver extends Task<Map
}
@Override
- public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
- for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
- op.updateCounters(ctrs);
- }
- if (work.getReduceWork() != null) {
- work.getReduceWork().getReducer().updateCounters(ctrs);
- }
- }
-
- @Override
public void logPlanProgress(SessionState ss) throws IOException {
ss.getHiveHistory().logPlanProgress(queryPlan);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Nov 26 08:19:25 2013
@@ -258,7 +258,7 @@ public class ExecReducer extends MapRedu
}
}
try {
- reducer.process(row, tag);
+ reducer.processOp(row, tag);
} catch (Exception e) {
String rowString = null;
try {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Nov 26 08:19:25 2013
@@ -34,8 +34,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.MapRedStats;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskHandle;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -80,14 +81,6 @@ public class HadoopJobExecHelper {
reduceProgress = reduceProgress == 100 ? (int)Math.floor(rj.reduceProgress() * 100) : reduceProgress;
task.taskCounters.put("CNTR_NAME_" + task.getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
task.taskCounters.put("CNTR_NAME_" + task.getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
- if (ctrs == null) {
- // hadoop might return null if it cannot locate the job.
- // we may still be able to retrieve the job status - so ignore
- return;
- }
- if(callBackObj != null) {
- callBackObj.updateCounters(ctrs, rj);
- }
}
/**
@@ -200,6 +193,7 @@ public class HadoopJobExecHelper {
}
}
+ @SuppressWarnings("deprecation")
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
if (ctrs == null) {
// hadoop might return null if it cannot locate the job.
@@ -207,7 +201,9 @@ public class HadoopJobExecHelper {
return false;
}
// check for number of created files
- long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+ Counters.Counter cntr = ctrs.findCounter(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP),
+ Operator.HIVECOUNTERCREATEDFILES);
+ long numFiles = cntr != null ? cntr.getValue() : 0;
long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
if (numFiles > upperLimit) {
errMsg.append("total number of created files now is " + numFiles + ", which exceeds ").append(upperLimit);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java Tue Nov 26 08:19:25 2013
@@ -22,12 +22,10 @@ import java.io.IOException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.RunningJob;
@SuppressWarnings("deprecation")
public interface HadoopJobExecHook {
- public void updateCounters(Counters ctrs, RunningJob rj) throws IOException;
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg);
public void logPlanProgress(SessionState ss) throws IOException;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Tue Nov 26 08:19:25 2013
@@ -230,8 +230,7 @@ public class MapredLocalTask extends Tas
if(ShimLoader.getHadoopShims().isSecurityEnabled() &&
- conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) == true
- ){
+ ShimLoader.getHadoopShims().isLoginKeytabBased()) {
//If kerberos security is enabled, and HS2 doAs is enabled,
// then additional params need to be set so that the command is run as
// intended user
@@ -359,7 +358,7 @@ public class MapredLocalTask extends Tas
forwardOp.close(false);
break;
}
- forwardOp.process(row.o, 0);
+ forwardOp.processOp(row.o, 0);
// check if any operator had a fatal error or early exit during
// execution
if (forwardOp.getDone()) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Nov 26 08:19:25 2013
@@ -619,9 +619,9 @@ public class DagUtils {
// initialize stats publisher if necessary
if (work.isGatheringStats()) {
StatsPublisher statsPublisher;
- String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- if (StatsFactory.setImplementation(statsImplementationClass, conf)) {
- statsPublisher = StatsFactory.getStatsPublisher();
+ StatsFactory factory = StatsFactory.newFactory(conf);
+ if (factory != null) {
+ statsPublisher = factory.getStatsPublisher();
if (!statsPublisher.init(conf)) { // creating stats table if not exists
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Nov 26 08:19:25 2013
@@ -295,7 +295,7 @@ public class ReduceRecordProcessor exte
row.add(valueObj);
try {
- reducer.process(row, tag);
+ reducer.processOp(row, tag);
} catch (Exception e) {
String rowString = null;
try {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Nov 26 08:19:25 2013
@@ -98,16 +98,6 @@ public class VectorFileSinkOperator exte
}
}
- // Since File Sink is a terminal operator, forward is not called - so,
- // maintain the number of output rows explicitly
- if (counterNameToEnum != null) {
- ++outputRows;
- if (outputRows % 1000 == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
- }
- }
-
try {
updateProgress();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Tue Nov 26 08:19:25 2013
@@ -30,6 +30,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
@@ -100,16 +101,22 @@ public class VectorGroupByOperator exten
* Sum of batch size processed (ie. rows).
*/
private transient long sumBatchSize;
+
+ /**
+ * Max number of entries in the vector group by aggregation hashtables.
+ * Exceeding this will trigger a flush irrelevant of memory pressure condition.
+ */
+ private transient int maxHtEntries = 1000000;
/**
* The number of new entries that must be added to the hashtable before a memory size check.
*/
- private static final int FLUSH_CHECK_THRESHOLD = 10000;
+ private transient int checkInterval = 10000;
/**
* Percent of entries to flush when memory threshold exceeded.
*/
- private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f;
+ private transient float percentEntriesToFlush = 0.1f;
/**
* The global key-aggregation hash map.
@@ -139,6 +146,16 @@ public class VectorGroupByOperator exten
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
+
+ // hconf is null in unit testing
+ if (null != hconf) {
+ this.percentEntriesToFlush = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT);
+ this.checkInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL);
+ this.maxHtEntries = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES);
+ }
List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
@@ -226,8 +243,21 @@ public class VectorGroupByOperator exten
processAggregators(batch);
//Flush if memory limits were reached
- if (shouldFlush(batch)) {
+ // We keep flushing until the memory is under threshold
+ int preFlushEntriesCount = numEntriesHashTable;
+ while (shouldFlush(batch)) {
flush(false);
+
+ //Validate that some progress is being made
+ if (!(numEntriesHashTable < preFlushEntriesCount)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after",
+ preFlushEntriesCount,
+ numEntriesHashTable));
+ }
+ break;
+ }
+ preFlushEntriesCount = numEntriesHashTable;
}
if (sumBatchSize == 0 && 0 != batch.size) {
@@ -247,7 +277,7 @@ public class VectorGroupByOperator exten
private void flush(boolean all) throws HiveException {
int entriesToFlush = all ? numEntriesHashTable :
- (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH);
+ (int)(numEntriesHashTable * this.percentEntriesToFlush);
int entriesFlushed = 0;
if (LOG.isDebugEnabled()) {
@@ -309,14 +339,18 @@ public class VectorGroupByOperator exten
* Returns true if the memory threshold for the hash table was reached.
*/
private boolean shouldFlush(VectorizedRowBatch batch) {
- if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD ||
- batch.size == 0) {
+ if (batch.size == 0) {
return false;
}
- // Were going to update the average variable row size by sampling the current batch
- updateAvgVariableSize(batch);
- numEntriesSinceCheck = 0;
- return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory;
+ //numEntriesSinceCheck is the number of entries added to the hash table
+ // since the last time we checked the average variable size
+ if (numEntriesSinceCheck >= this.checkInterval) {
+ // Were going to update the average variable row size by sampling the current batch
+ updateAvgVariableSize(batch);
+ numEntriesSinceCheck = 0;
+ }
+ return numEntriesHashTable > this.maxHtEntries ||
+ numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory;
}
/**
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java Tue Nov 26 08:19:25 2013
@@ -33,4 +33,6 @@ public interface VectorExpressionWriter
Object writeValue(long value) throws HiveException;
Object writeValue(double value) throws HiveException;
Object writeValue(byte[] value, int start, int length) throws HiveException;
+ Object setValue(Object row, ColumnVector column, int columnRow) throws HiveException;
+ Object initValue(Object ost) throws HiveException;
}