You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/26 09:19:34 UTC

svn commit: r1545564 [3/22] - in /hive/branches/tez: ./ ant/ beeline/ bin/ cli/ common/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/...

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Nov 26 08:19:25 2013
@@ -46,7 +46,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
@@ -59,24 +59,13 @@ public abstract class Operator<T extends
 
   private static final long serialVersionUID = 1L;
 
+  public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
+  public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
+
   private transient Configuration configuration;
   protected List<Operator<? extends OperatorDesc>> childOperators;
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
-  /**
-   * List of counter names associated with the operator. It contains the
-   * following default counters NUM_INPUT_ROWS NUM_OUTPUT_ROWS TIME_TAKEN
-   * Individual operators can add to this list via addToCounterNames methods.
-   */
-  protected ArrayList<String> counterNames;
-
-  /**
-   * Each operator has its own map of its counter names to disjoint
-   * ProgressCounter - it is populated at compile time and is read in at
-   * run-time while extracting the operator specific counts.
-   */
-  protected HashMap<String, ProgressCounter> counterNameToEnum;
-
   private transient ExecMapperContext execContext;
 
   private static AtomicInteger seqId;
@@ -98,13 +87,10 @@ public abstract class Operator<T extends
     // to children. Note: close() being called and its state being CLOSE is
     // difference since close() could be called but state is not CLOSE if
     // one of its parent is not in state CLOSE..
-  };
+  }
 
   protected transient State state = State.UNINIT;
 
-  protected static transient boolean fatalError = false; // fatalError is shared acorss
-  // all operators
-
   static {
     seqId = new AtomicInteger(0);
   }
@@ -115,6 +101,7 @@ public abstract class Operator<T extends
     id = String.valueOf(seqId.getAndIncrement());
     childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
     parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+    initOperatorId();
   }
 
   public static void resetId() {
@@ -197,10 +184,10 @@ public abstract class Operator<T extends
   }
 
   public boolean getDone() {
-    return done || fatalError;
+    return done;
   }
 
-  public void setDone(boolean done) {
+  protected final void setDone(boolean done) {
     this.done = done;
   }
 
@@ -218,6 +205,8 @@ public abstract class Operator<T extends
   // non-bean ..
 
   protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
+  @SuppressWarnings("rawtypes")
+  protected transient OutputCollector out;
   protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
   protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled();
   protected transient String alias;
@@ -263,6 +252,20 @@ public abstract class Operator<T extends
     }
   }
 
+  @SuppressWarnings("rawtypes")
+  public void setOutputCollector(OutputCollector out) {
+    this.out = out;
+
+    // the collector is same across all operators
+    if (childOperators == null) {
+      return;
+    }
+
+    for (Operator<? extends OperatorDesc> op : childOperators) {
+      op.setOutputCollector(out);
+    }
+  }
+
   /**
    * Store the alias this operator is working on behalf of.
    */
@@ -318,6 +321,7 @@ public abstract class Operator<T extends
    *          ignored.
    * @throws HiveException
    */
+  @SuppressWarnings("unchecked")
   public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
       throws HiveException {
     if (state == State.INIT) {
@@ -475,38 +479,6 @@ public abstract class Operator<T extends
    */
   public abstract void processOp(Object row, int tag) throws HiveException;
 
-  /**
-   * Process the row.
-   *
-   * @param row
-   *          The object representing the row.
-   * @param tag
-   *          The tag of the row usually means which parent this row comes from.
-   *          Rows with the same tag should have exactly the same rowInspector
-   *          all the time.
-   */
-  public void process(Object row, int tag) throws HiveException {
-    if (fatalError) {
-      return;
-    }
-
-    if (counterNameToEnum != null) {
-      inputRows++;
-      if ((inputRows % 1000) == 0) {
-        incrCounter(numInputRowsCntr, inputRows);
-        incrCounter(timeTakenCntr, totalTime);
-        inputRows = 0;
-        totalTime = 0;
-      }
-
-      beginTime = System.currentTimeMillis();
-      processOp(row, tag);
-      totalTime += (System.currentTimeMillis() - beginTime);
-    } else {
-      processOp(row, tag);
-    }
-  }
-
   protected final void defaultStartGroup() throws HiveException {
     LOG.debug("Starting group");
 
@@ -514,10 +486,6 @@ public abstract class Operator<T extends
       return;
     }
 
-    if (fatalError) {
-      return;
-    }
-
     LOG.debug("Starting group for children:");
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.startGroup();
@@ -533,10 +501,6 @@ public abstract class Operator<T extends
       return;
     }
 
-    if (fatalError) {
-      return;
-    }
-
     LOG.debug("Ending group for children:");
     for (Operator<? extends OperatorDesc> op : childOperators) {
       op.endGroup();
@@ -607,14 +571,6 @@ public abstract class Operator<T extends
 
     reporter = null;
 
-    if (counterNameToEnum != null) {
-      incrCounter(numInputRowsCntr, inputRows);
-      incrCounter(numOutputRowsCntr, outputRows);
-      incrCounter(timeTakenCntr, totalTime);
-    }
-
-    LOG.info(id + " forwarded " + cntr + " rows");
-
     try {
       logStats();
       if (childOperators == null) {
@@ -679,10 +635,6 @@ public abstract class Operator<T extends
   protected transient Operator<? extends OperatorDesc>[] childOperatorsArray = null;
   protected transient int[] childOperatorsTag;
 
-  // counters for debugging
-  private transient long cntr = 0;
-  private transient long nextCntr = 1;
-
   /**
    * Replace one child with another at the same position. The parent of the
    * child is not changed
@@ -821,21 +773,6 @@ public abstract class Operator<T extends
   protected void forward(Object row, ObjectInspector rowInspector)
       throws HiveException {
 
-    if (counterNameToEnum != null) {
-      if ((++outputRows % 1000) == 0) {
-        incrCounter(numOutputRowsCntr, outputRows);
-        outputRows = 0;
-      }
-    }
-
-    increaseForward(1);
-
-    // For debugging purposes:
-    // System.out.println("" + this.getClass() + ": " +
-    // SerDeUtils.getJSONString(row, rowInspector));
-    // System.out.println("" + this.getClass() + ">> " +
-    // ObjectInspectorUtils.getObjectInspectorName(rowInspector));
-
     if ((childOperatorsArray == null) || (getDone())) {
       return;
     }
@@ -846,7 +783,7 @@ public abstract class Operator<T extends
       if (o.getDone()) {
         childrenDone++;
       } else {
-        o.process(row, childOperatorsTag[i]);
+        o.processOp(row, childOperatorsTag[i]);
       }
     }
 
@@ -856,18 +793,6 @@ public abstract class Operator<T extends
     }
   }
 
-  void increaseForward(long counter) {
-    if (isLogInfoEnabled) {
-      cntr += counter;
-      if (cntr >= nextCntr) {
-        LOG.info(id + " forwarding " + cntr + " rows");
-        do {
-          nextCntr = getNextCntr(nextCntr);
-        } while(cntr >= nextCntr);
-      }
-    }
-  }
-
   public void resetStats() {
     for (Enum<?> e : statsMap.keySet()) {
       statsMap.get(e).set(0L);
@@ -1028,174 +953,12 @@ public abstract class Operator<T extends
         outputColName, Arrays.asList(fieldObjectInspectors));
   }
 
-  /**
-   * All counter stuff below this
-   */
-
-  /**
-   * TODO This is a hack for hadoop 0.17 which only supports enum counters.
-   */
-  public static enum ProgressCounter {
-    CREATED_FILES,
-    C1, C2, C3, C4, C5, C6, C7, C8, C9, C10,
-    C11, C12, C13, C14, C15, C16, C17, C18, C19, C20,
-    C21, C22, C23, C24, C25, C26, C27, C28, C29, C30,
-    C31, C32, C33, C34, C35, C36, C37, C38, C39, C40,
-    C41, C42, C43, C44, C45, C46, C47, C48, C49, C50,
-    C51, C52, C53, C54, C55, C56, C57, C58, C59, C60,
-    C61, C62, C63, C64, C65, C66, C67, C68, C69, C70,
-    C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
-    C81, C82, C83, C84, C85, C86, C87, C88, C89, C90,
-    C91, C92, C93, C94, C95, C96, C97, C98, C99, C100,
-    C101, C102, C103, C104, C105, C106, C107, C108, C109, C110,
-    C111, C112, C113, C114, C115, C116, C117, C118, C119, C120,
-    C121, C122, C123, C124, C125, C126, C127, C128, C129, C130,
-    C131, C132, C133, C134, C135, C136, C137, C138, C139, C140,
-    C141, C142, C143, C144, C145, C146, C147, C148, C149, C150,
-    C151, C152, C153, C154, C155, C156, C157, C158, C159, C160,
-    C161, C162, C163, C164, C165, C166, C167, C168, C169, C170,
-    C171, C172, C173, C174, C175, C176, C177, C178, C179, C180,
-    C181, C182, C183, C184, C185, C186, C187, C188, C189, C190,
-    C191, C192, C193, C194, C195, C196, C197, C198, C199, C200,
-    C201, C202, C203, C204, C205, C206, C207, C208, C209, C210,
-    C211, C212, C213, C214, C215, C216, C217, C218, C219, C220,
-    C221, C222, C223, C224, C225, C226, C227, C228, C229, C230,
-    C231, C232, C233, C234, C235, C236, C237, C238, C239, C240,
-    C241, C242, C243, C244, C245, C246, C247, C248, C249, C250,
-    C251, C252, C253, C254, C255, C256, C257, C258, C259, C260,
-    C261, C262, C263, C264, C265, C266, C267, C268, C269, C270,
-    C271, C272, C273, C274, C275, C276, C277, C278, C279, C280,
-    C281, C282, C283, C284, C285, C286, C287, C288, C289, C290,
-    C291, C292, C293, C294, C295, C296, C297, C298, C299, C300,
-    C301, C302, C303, C304, C305, C306, C307, C308, C309, C310,
-    C311, C312, C313, C314, C315, C316, C317, C318, C319, C320,
-    C321, C322, C323, C324, C325, C326, C327, C328, C329, C330,
-    C331, C332, C333, C334, C335, C336, C337, C338, C339, C340,
-    C341, C342, C343, C344, C345, C346, C347, C348, C349, C350,
-    C351, C352, C353, C354, C355, C356, C357, C358, C359, C360,
-    C361, C362, C363, C364, C365, C366, C367, C368, C369, C370,
-    C371, C372, C373, C374, C375, C376, C377, C378, C379, C380,
-    C381, C382, C383, C384, C385, C386, C387, C388, C389, C390,
-    C391, C392, C393, C394, C395, C396, C397, C398, C399, C400,
-    C401, C402, C403, C404, C405, C406, C407, C408, C409, C410,
-    C411, C412, C413, C414, C415, C416, C417, C418, C419, C420,
-    C421, C422, C423, C424, C425, C426, C427, C428, C429, C430,
-    C431, C432, C433, C434, C435, C436, C437, C438, C439, C440,
-    C441, C442, C443, C444, C445, C446, C447, C448, C449, C450,
-    C451, C452, C453, C454, C455, C456, C457, C458, C459, C460,
-    C461, C462, C463, C464, C465, C466, C467, C468, C469, C470,
-    C471, C472, C473, C474, C475, C476, C477, C478, C479, C480,
-    C481, C482, C483, C484, C485, C486, C487, C488, C489, C490,
-    C491, C492, C493, C494, C495, C496, C497, C498, C499, C500,
-    C501, C502, C503, C504, C505, C506, C507, C508, C509, C510,
-    C511, C512, C513, C514, C515, C516, C517, C518, C519, C520,
-    C521, C522, C523, C524, C525, C526, C527, C528, C529, C530,
-    C531, C532, C533, C534, C535, C536, C537, C538, C539, C540,
-    C541, C542, C543, C544, C545, C546, C547, C548, C549, C550,
-    C551, C552, C553, C554, C555, C556, C557, C558, C559, C560,
-    C561, C562, C563, C564, C565, C566, C567, C568, C569, C570,
-    C571, C572, C573, C574, C575, C576, C577, C578, C579, C580,
-    C581, C582, C583, C584, C585, C586, C587, C588, C589, C590,
-    C591, C592, C593, C594, C595, C596, C597, C598, C599, C600,
-    C601, C602, C603, C604, C605, C606, C607, C608, C609, C610,
-    C611, C612, C613, C614, C615, C616, C617, C618, C619, C620,
-    C621, C622, C623, C624, C625, C626, C627, C628, C629, C630,
-    C631, C632, C633, C634, C635, C636, C637, C638, C639, C640,
-    C641, C642, C643, C644, C645, C646, C647, C648, C649, C650,
-    C651, C652, C653, C654, C655, C656, C657, C658, C659, C660,
-    C661, C662, C663, C664, C665, C666, C667, C668, C669, C670,
-    C671, C672, C673, C674, C675, C676, C677, C678, C679, C680,
-    C681, C682, C683, C684, C685, C686, C687, C688, C689, C690,
-    C691, C692, C693, C694, C695, C696, C697, C698, C699, C700,
-    C701, C702, C703, C704, C705, C706, C707, C708, C709, C710,
-    C711, C712, C713, C714, C715, C716, C717, C718, C719, C720,
-    C721, C722, C723, C724, C725, C726, C727, C728, C729, C730,
-    C731, C732, C733, C734, C735, C736, C737, C738, C739, C740,
-    C741, C742, C743, C744, C745, C746, C747, C748, C749, C750,
-    C751, C752, C753, C754, C755, C756, C757, C758, C759, C760,
-    C761, C762, C763, C764, C765, C766, C767, C768, C769, C770,
-    C771, C772, C773, C774, C775, C776, C777, C778, C779, C780,
-    C781, C782, C783, C784, C785, C786, C787, C788, C789, C790,
-    C791, C792, C793, C794, C795, C796, C797, C798, C799, C800,
-    C801, C802, C803, C804, C805, C806, C807, C808, C809, C810,
-    C811, C812, C813, C814, C815, C816, C817, C818, C819, C820,
-    C821, C822, C823, C824, C825, C826, C827, C828, C829, C830,
-    C831, C832, C833, C834, C835, C836, C837, C838, C839, C840,
-    C841, C842, C843, C844, C845, C846, C847, C848, C849, C850,
-    C851, C852, C853, C854, C855, C856, C857, C858, C859, C860,
-    C861, C862, C863, C864, C865, C866, C867, C868, C869, C870,
-    C871, C872, C873, C874, C875, C876, C877, C878, C879, C880,
-    C881, C882, C883, C884, C885, C886, C887, C888, C889, C890,
-    C891, C892, C893, C894, C895, C896, C897, C898, C899, C900,
-    C901, C902, C903, C904, C905, C906, C907, C908, C909, C910,
-    C911, C912, C913, C914, C915, C916, C917, C918, C919, C920,
-    C921, C922, C923, C924, C925, C926, C927, C928, C929, C930,
-    C931, C932, C933, C934, C935, C936, C937, C938, C939, C940,
-    C941, C942, C943, C944, C945, C946, C947, C948, C949, C950,
-    C951, C952, C953, C954, C955, C956, C957, C958, C959, C960,
-    C961, C962, C963, C964, C965, C966, C967, C968, C969, C970,
-    C971, C972, C973, C974, C975, C976, C977, C978, C979, C980,
-    C981, C982, C983, C984, C985, C986, C987, C988, C989, C990,
-    C991, C992, C993, C994, C995, C996, C997, C998, C999, C1000
-  };
-
-  private static int totalNumCntrs = 1000;
-
-  /**
-   * populated at runtime from hadoop counters at run time in the client.
-   */
-  protected transient HashMap<String, Long> counters;
-
-  /**
-   * keeps track of unique ProgressCounter enums used this value is used at
-   * compile time while assigning ProgressCounter enums to counter names.
-   */
-  private static int lastEnumUsed;
-
-  protected transient long inputRows = 0;
-  protected transient long outputRows = 0;
-  protected transient long beginTime = 0;
-  protected transient long totalTime = 0;
-
   protected transient Object groupKeyObject;
 
-  /**
-   * this is called in operators in map or reduce tasks.
-   *
-   * @param name
-   * @param amount
-   */
-  protected void incrCounter(String name, long amount) {
-    String counterName = getWrappedCounterName(name);
-    ProgressCounter pc = counterNameToEnum.get(counterName);
-
-    // Currently, we maintain fixed number of counters per plan - in case of a
-    // bigger tree, we may run out of them
-    if (pc == null) {
-      LOG
-          .warn("Using too many counters. Increase the total number of counters for "
-          + counterName);
-    } else if (reporter != null) {
-      reporter.incrCounter(pc, amount);
-    }
-  }
-
-  public ArrayList<String> getCounterNames() {
-    return counterNames;
-  }
-
-  public void setCounterNames(ArrayList<String> counterNames) {
-    this.counterNames = counterNames;
-  }
-
   public String getOperatorId() {
     return operatorId;
   }
 
-  public final String getWrappedCounterName(String ctrName) {
-    return String.format(counterNameFormat, getOperatorId(), ctrName);
-  }
-
   public void initOperatorId() {
     setOperatorId(getName() + "_" + this.id);
   }
@@ -1204,145 +967,6 @@ public abstract class Operator<T extends
     this.operatorId = operatorId;
   }
 
-  public HashMap<String, Long> getCounters() {
-    return counters;
-  }
-
-  /**
-   * called in ExecDriver.progress periodically.
-   *
-   * @param ctrs
-   *          counters from the running job
-   */
-  @SuppressWarnings("unchecked")
-  public void updateCounters(Counters ctrs) {
-    if (counters == null) {
-      counters = new HashMap<String, Long>();
-    }
-
-    // For some old unit tests, the counters will not be populated. Eventually,
-    // the old tests should be removed
-    if (counterNameToEnum == null) {
-      return;
-    }
-
-    for (Map.Entry<String, ProgressCounter> counter : counterNameToEnum
-        .entrySet()) {
-      counters.put(counter.getKey(), ctrs.getCounter(counter.getValue()));
-    }
-    // update counters of child operators
-    // this wont be an infinite loop since the operator graph is acyclic
-    // but, some operators may be updated more than once and that's ok
-    if (getChildren() != null) {
-      for (Node op : getChildren()) {
-        ((Operator<? extends OperatorDesc>) op).updateCounters(ctrs);
-      }
-    }
-  }
-
-  /**
-   * Recursively check this operator and its descendants to see if the fatal
-   * error counter is set to non-zero.
-   *
-   * @param ctrs
-   */
-  public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
-    if (counterNameToEnum == null) {
-      return false;
-    }
-
-    String counterName = getWrappedCounterName(fatalErrorCntr);
-    ProgressCounter pc = counterNameToEnum.get(counterName);
-
-    // Currently, we maintain fixed number of counters per plan - in case of a
-    // bigger tree, we may run out of them
-    if (pc == null) {
-      LOG
-          .warn("Using too many counters. Increase the total number of counters for "
-          + counterName);
-    } else {
-      long value = ctrs.getCounter(pc);
-      fatalErrorMessage(errMsg, value);
-      if (value != 0) {
-        return true;
-      }
-    }
-
-    if (getChildren() != null) {
-      for (Node op : getChildren()) {
-        if (((Operator<? extends OperatorDesc>) op).checkFatalErrors(ctrs,
-            errMsg)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Get the fatal error message based on counter's code.
-   *
-   * @param errMsg
-   *          error message should be appended to this output parameter.
-   * @param counterValue
-   *          input counter code.
-   */
-  protected void fatalErrorMessage(StringBuilder errMsg, long counterValue) {
-  }
-
-  // A given query can have multiple map-reduce jobs
-  public static void resetLastEnumUsed() {
-    lastEnumUsed = 0;
-  }
-
-  /**
-   * Called only in SemanticAnalyzer after all operators have added their own
-   * set of counter names.
-   */
-  public void assignCounterNameToEnum() {
-    if (counterNameToEnum != null) {
-      return;
-    }
-    counterNameToEnum = new HashMap<String, ProgressCounter>();
-    for (String counterName : getCounterNames()) {
-      ++lastEnumUsed;
-
-      // TODO Hack for hadoop-0.17
-      // Currently, only maximum number of 'totalNumCntrs' can be used. If you
-      // want
-      // to add more counters, increase the number of counters in
-      // ProgressCounter
-      if (lastEnumUsed > totalNumCntrs) {
-        LOG
-            .warn("Using too many counters. Increase the total number of counters");
-        return;
-      }
-      String enumName = "C" + lastEnumUsed;
-      ProgressCounter ctr = ProgressCounter.valueOf(enumName);
-      counterNameToEnum.put(counterName, ctr);
-    }
-  }
-
-  protected static String numInputRowsCntr = "NUM_INPUT_ROWS";
-  protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
-  protected static String timeTakenCntr = "TIME_TAKEN";
-  protected static String fatalErrorCntr = "FATAL_ERROR";
-  private static String counterNameFormat = "CNTR_NAME_%s_%s";
-
-  public void initializeCounters() {
-    initOperatorId();
-    counterNames = new ArrayList<String>();
-    counterNames.add(getWrappedCounterName(numInputRowsCntr));
-    counterNames.add(getWrappedCounterName(numOutputRowsCntr));
-    counterNames.add(getWrappedCounterName(timeTakenCntr));
-    counterNames.add(getWrappedCounterName(fatalErrorCntr));
-    /* getAdditionalCounter should return Wrapped counters */
-    List<String> newCntrs = getAdditionalCounters();
-    if (newCntrs != null) {
-      counterNames.addAll(newCntrs);
-    }
-  }
-
   /*
    * By default, the list is empty - if an operator wants to add more counters,
    * it should override this method and provide the new list. Counter names returned
@@ -1353,15 +977,6 @@ public abstract class Operator<T extends
     return null;
   }
 
-  public HashMap<String, ProgressCounter> getCounterNameToEnum() {
-    return counterNameToEnum;
-  }
-
-  public void setCounterNameToEnum(
-      HashMap<String, ProgressCounter> counterNameToEnum) {
-    this.counterNameToEnum = counterNameToEnum;
-  }
-
   /**
    * Return the type of the specific operator among the
    * types in OperatorType.
@@ -1435,10 +1050,10 @@ public abstract class Operator<T extends
       }
     }
 
+    @SuppressWarnings("unchecked")
     T descClone = (T)conf.clone();
     Operator<? extends OperatorDesc> ret =
-      (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
-        descClone, getSchema(), parentClones);
+        OperatorFactory.getAndMakeChild(descClone, getSchema(), parentClones);
 
     return ret;
   }
@@ -1564,25 +1179,6 @@ public abstract class Operator<T extends
   }
 
   /**
-   * Computes and retrieves the stats for this operator. Default implementation assumes same
-   * input/output size for operator.
-   *
-   * @return Statistics for this operator
-   */
-  public Statistics getStatistics(HiveConf conf) throws HiveException {
-    Statistics stats = this.getConf().getStatistics();
-
-    if (stats == null) {
-      stats = new Statistics();
-      for (Operator<? extends OperatorDesc> parent: this.getParentOperators()) {
-        stats.addNumberOfBytes(parent.getStatistics(conf).getNumberOfBytes());
-      }
-      this.getConf().setStatistics(stats);
-    }
-    return stats;
-  }
-
-  /**
    * used for LimitPushdownOptimizer
    *
    * if all of the operators between limit and reduce-sink does not remove any input rows
@@ -1636,4 +1232,22 @@ public abstract class Operator<T extends
     }
     return false;
   }
+
+  public Statistics getStatistics() {
+    if (conf != null) {
+      return conf.getStatistics();
+    }
+    return null;
+  }
+
+  public void setStatistics(Statistics stats) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Setting stats ("+stats+") on "+this);
+    }
+    if (conf != null) {
+      conf.setStatistics(stats);
+    } else {
+      LOG.warn("Cannot set stats when there's no descriptor: "+this);
+    }
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Nov 26 08:19:25 2013
@@ -137,7 +137,6 @@ public final class OperatorFactory {
           Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor(
               VectorizationContext.class, OperatorDesc.class).newInstance(
               vContext, conf);
-          op.initializeCounters();
           return op;
         } catch (Exception e) {
           e.printStackTrace();
@@ -155,7 +154,6 @@ public final class OperatorFactory {
       if (o.descClass == opClass) {
         try {
           Operator<T> op = (Operator<T>) o.opClass.newInstance();
-          op.initializeCounters();
           return op;
         } catch (Exception e) {
           e.printStackTrace();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Nov 26 08:19:25 2013
@@ -363,13 +363,6 @@ public class ReduceSinkOperator extends 
     if (null != out) {
       out.collect(keyWritable, valueWritable);
     }
-    if (++outputRows % 1000 == 0) {
-      if (counterNameToEnum != null) {
-        incrCounter(numOutputRowsCntr, outputRows);
-      }
-      increaseForward(outputRows);
-      outputRows = 0;
-    }
   }
 
   private BytesWritable makeValueWritable(Object row) throws Exception {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Nov 26 08:19:25 2013
@@ -550,7 +550,7 @@ public class SMBMapJoinOperator extends 
         fetchDone[tag] = true;
         return;
       }
-      forwardOp.process(row.o, tag);
+      forwardOp.processOp(row.o, tag);
       // check if any operator had a fatal error or early exit during
       // execution
       if (forwardOp.getDone()) {
@@ -795,7 +795,7 @@ public class SMBMapJoinOperator extends 
 
         // Pass the row though the operator tree. It is guaranteed that not more than 1 row can
         // be produced from a input row.
-        forwardOp.process(nextRow.o, 0);
+        forwardOp.processOp(nextRow.o, 0);
         nextRow = sinkOp.getResult();
 
         // It is possible that the row got absorbed in the operator tree.

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Nov 26 08:19:25 2013
@@ -28,9 +28,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -178,11 +176,11 @@ public class StatsTask extends Task<Stat
 
       if (!this.getWork().getNoStatsAggregator()) {
         String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-        StatsFactory.setImplementation(statsImplementationClass, conf);
-        if (work.isNoScanAnalyzeCommand()){
+        StatsFactory factory = StatsFactory.newFactory(statsImplementationClass, conf);
+        if (factory != null && work.isNoScanAnalyzeCommand()){
           // initialize stats publishing table for noscan which has only stats task
           // the rest of MR task following stats task initializes it in ExecDriver.java
-          StatsPublisher statsPublisher = StatsFactory.getStatsPublisher();
+          StatsPublisher statsPublisher = factory.getStatsPublisher();
           if (!statsPublisher.init(conf)) { // creating stats table if not exists
             if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
               throw
@@ -190,10 +188,12 @@ public class StatsTask extends Task<Stat
             }
           }
         }
-        statsAggregator = StatsFactory.getStatsAggregator();
-        // manufacture a StatsAggregator
-        if (!statsAggregator.connect(conf)) {
-          throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+        if (factory != null) {
+          statsAggregator = factory.getStatsAggregator();
+          // manufacture a StatsAggregator
+          if (!statsAggregator.connect(conf, getWork().getSourceTask())) {
+            throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+          }
         }
       }
 
@@ -377,7 +377,7 @@ public class StatsTask extends Task<Stat
         if (work.getLoadTableDesc() != null &&
             !work.getLoadTableDesc().getReplace()) {
           String originalValue = parameters.get(statType);
-          if (originalValue != null) {
+          if (originalValue != null && !originalValue.equals("-1")) {
             longValue += Long.parseLong(originalValue);
           }
         }
@@ -445,19 +445,4 @@ public class StatsTask extends Task<Stat
     }
     return list;
   }
-
-  /**
-   * This method is static as it is called from the shutdown hook at the ExecDriver.
-   */
-  public static void cleanUp(String jobID, Configuration config) {
-    StatsAggregator statsAggregator;
-    String statsImplementationClass = HiveConf.getVar(config, HiveConf.ConfVars.HIVESTATSDBCLASS);
-    StatsFactory.setImplementation(statsImplementationClass, config);
-    statsAggregator = StatsFactory.getStatsAggregator();
-    if (statsAggregator.connect(config)) {
-      statsAggregator.cleanUp(jobID + Path.SEPARATOR); // Adding the path separator to avoid an Id
-                                                       // being a prefix of another ID
-      statsAggregator.closeConnection();
-    }
-  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Nov 26 08:19:25 2013
@@ -322,18 +322,6 @@ public class TableScanOperator extends O
   }
 
   @Override
-  public Statistics getStatistics(HiveConf conf) throws HiveException {
-    Statistics stats = this.getConf().getStatistics();
-    if (stats == null) {
-      stats = new Statistics();
-      stats.addNumberOfBytes(Utilities.getSize(alias, getConf().getTable(), conf,
-          this, getConf().getPruningPredicate()));
-      this.getConf().setStatistics(stats);
-    }
-    return stats;
-  }
-
-  @Override
   public boolean supportSkewJoinOptimization() {
     return true;
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 26 08:19:25 2013
@@ -837,7 +837,6 @@ public final class Utilities {
     // workaround for java 1.5
     e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
     e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-    e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
     e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
     e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
 
@@ -2155,12 +2154,6 @@ public final class Utilities {
                 HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf,
                     partDesc.getOverlayedProperties().getProperty(
                     hive_metastoreConstants.META_TABLE_STORAGE));
-                if (handler == null) {
-                  // native table
-                  FileSystem fs = p.getFileSystem(myConf);
-                  resultMap.put(pathStr, fs.getContentSummary(p));
-                  return;
-                }
                 if (handler instanceof InputEstimator) {
                   long total = 0;
                   TableDesc tableDesc = partDesc.getTableDesc();
@@ -2176,6 +2169,8 @@ public final class Utilities {
                   }
                   resultMap.put(pathStr, new ContentSummary(total, -1, -1));
                 }
+                FileSystem fs = p.getFileSystem(myConf);
+                resultMap.put(pathStr, fs.getContentSummary(p));
               } catch (Exception e) {
                 // We safely ignore this exception for summary data.
                 // We don't update the cache to protect it from polluting other
@@ -2344,12 +2339,8 @@ public final class Utilities {
   }
 
   public static StatsPublisher getStatsPublisher(JobConf jc) {
-    String statsImplementationClass = HiveConf.getVar(jc, HiveConf.ConfVars.HIVESTATSDBCLASS);
-    if (StatsFactory.setImplementation(statsImplementationClass, jc)) {
-      return StatsFactory.getStatsPublisher();
-    } else {
-      return null;
-    }
+    StatsFactory factory = StatsFactory.newFactory(jc);
+    return factory == null ? null : factory.getStatsPublisher();
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Nov 26 08:19:25 2013
@@ -82,7 +82,6 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.log4j.Appender;
@@ -185,17 +184,10 @@ public class ExecDriver extends Task<Map
    * @return true if fatal errors happened during job execution, false otherwise.
    */
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
-    for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
-      if (op.checkFatalErrors(ctrs, errMsg)) {
-        return true;
-      }
-    }
-    if (work.getReduceWork() != null) {
-      if (work.getReduceWork().getReducer().checkFatalErrors(ctrs, errMsg)) {
-        return true;
-      }
-    }
-    return false;
+     Counters.Counter cntr = ctrs.findCounter(
+        HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP),
+        Operator.HIVECOUNTERFATAL);
+    return cntr != null && cntr.getValue() > 0;
   }
 
    /**
@@ -414,9 +406,9 @@ public class ExecDriver extends Task<Map
       if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) {
         // initialize stats publishing table
         StatsPublisher statsPublisher;
-        String statsImplementationClass = HiveConf.getVar(job, HiveConf.ConfVars.HIVESTATSDBCLASS);
-        if (StatsFactory.setImplementation(statsImplementationClass, job)) {
-          statsPublisher = StatsFactory.getStatsPublisher();
+        StatsFactory factory = StatsFactory.newFactory(job);
+        if (factory != null) {
+          statsPublisher = factory.getStatsPublisher();
           if (!statsPublisher.init(job)) { // creating stats table if not exists
             if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
               throw
@@ -816,16 +808,6 @@ public class ExecDriver extends Task<Map
   }
 
   @Override
-  public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
-    for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
-      op.updateCounters(ctrs);
-    }
-    if (work.getReduceWork() != null) {
-      work.getReduceWork().getReducer().updateCounters(ctrs);
-    }
-  }
-
-  @Override
   public void logPlanProgress(SessionState ss) throws IOException {
     ss.getHiveHistory().logPlanProgress(queryPlan);
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Nov 26 08:19:25 2013
@@ -258,7 +258,7 @@ public class ExecReducer extends MapRedu
           }
         }
         try {
-          reducer.process(row, tag);
+          reducer.processOp(row, tag);
         } catch (Exception e) {
           String rowString = null;
           try {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Nov 26 08:19:25 2013
@@ -34,8 +34,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.MapRedStats;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskHandle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -80,14 +81,6 @@ public class HadoopJobExecHelper {
     reduceProgress = reduceProgress == 100 ? (int)Math.floor(rj.reduceProgress() * 100) : reduceProgress;
     task.taskCounters.put("CNTR_NAME_" + task.getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
     task.taskCounters.put("CNTR_NAME_" + task.getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
-    if (ctrs == null) {
-      // hadoop might return null if it cannot locate the job.
-      // we may still be able to retrieve the job status - so ignore
-      return;
-    }
-    if(callBackObj != null) {
-      callBackObj.updateCounters(ctrs, rj);
-    }
   }
 
   /**
@@ -200,6 +193,7 @@ public class HadoopJobExecHelper {
     }
   }
 
+  @SuppressWarnings("deprecation")
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
     if (ctrs == null) {
       // hadoop might return null if it cannot locate the job.
@@ -207,7 +201,9 @@ public class HadoopJobExecHelper {
       return false;
     }
     // check for number of created files
-    long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+    Counters.Counter cntr = ctrs.findCounter(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP),
+        Operator.HIVECOUNTERCREATEDFILES);
+    long numFiles = cntr != null ? cntr.getValue() : 0;
     long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
     if (numFiles > upperLimit) {
       errMsg.append("total number of created files now is " + numFiles + ", which exceeds ").append(upperLimit);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHook.java Tue Nov 26 08:19:25 2013
@@ -22,12 +22,10 @@ import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.RunningJob;
 
 @SuppressWarnings("deprecation")
 public interface HadoopJobExecHook {
   
-  public void updateCounters(Counters ctrs, RunningJob rj) throws IOException;
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg);
   public void logPlanProgress(SessionState ss) throws IOException;
   

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Tue Nov 26 08:19:25 2013
@@ -230,8 +230,7 @@ public class MapredLocalTask extends Tas
 
 
       if(ShimLoader.getHadoopShims().isSecurityEnabled() &&
-          conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) == true
-          ){
+          ShimLoader.getHadoopShims().isLoginKeytabBased()) {
         //If kerberos security is enabled, and HS2 doAs is enabled,
         // then additional params need to be set so that the command is run as
         // intended user
@@ -359,7 +358,7 @@ public class MapredLocalTask extends Tas
           forwardOp.close(false);
           break;
         }
-        forwardOp.process(row.o, 0);
+        forwardOp.processOp(row.o, 0);
         // check if any operator had a fatal error or early exit during
         // execution
         if (forwardOp.getDone()) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Nov 26 08:19:25 2013
@@ -619,9 +619,9 @@ public class DagUtils {
     // initialize stats publisher if necessary
     if (work.isGatheringStats()) {
       StatsPublisher statsPublisher;
-      String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-      if (StatsFactory.setImplementation(statsImplementationClass, conf)) {
-        statsPublisher = StatsFactory.getStatsPublisher();
+      StatsFactory factory = StatsFactory.newFactory(conf);
+      if (factory != null) {
+        statsPublisher = factory.getStatsPublisher();
         if (!statsPublisher.init(conf)) { // creating stats table if not exists
           if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
             throw

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Nov 26 08:19:25 2013
@@ -295,7 +295,7 @@ public class ReduceRecordProcessor  exte
         row.add(valueObj);
 
         try {
-          reducer.process(row, tag);
+          reducer.processOp(row, tag);
         } catch (Exception e) {
           String rowString = null;
           try {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Nov 26 08:19:25 2013
@@ -98,16 +98,6 @@ public class VectorFileSinkOperator exte
       }
     }
 
-    // Since File Sink is a terminal operator, forward is not called - so,
-    // maintain the number of output rows explicitly
-    if (counterNameToEnum != null) {
-      ++outputRows;
-      if (outputRows % 1000 == 0) {
-        incrCounter(numOutputRowsCntr, outputRows);
-        outputRows = 0;
-      }
-    }
-
     try {
       updateProgress();
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Tue Nov 26 08:19:25 2013
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
@@ -100,16 +101,22 @@ public class VectorGroupByOperator exten
    * Sum of batch size processed (ie. rows).
    */
   private transient long sumBatchSize;
+  
+  /**
+   * Max number of entries in the vector group by aggregation hashtables. 
+   * Exceeding this will trigger a flush irrelevant of memory pressure condition.
+   */
+  private transient int maxHtEntries = 1000000;
 
   /**
    * The number of new entries that must be added to the hashtable before a memory size check.
    */
-  private static final int FLUSH_CHECK_THRESHOLD = 10000;
+  private transient int checkInterval = 10000;
 
   /**
    * Percent of entries to flush when memory threshold exceeded.
    */
-  private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f;
+  private transient float percentEntriesToFlush = 0.1f;
 
   /**
    * The global key-aggregation hash map.
@@ -139,6 +146,16 @@ public class VectorGroupByOperator exten
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
+    
+    // hconf is null in unit testing
+    if (null != hconf) {
+      this.percentEntriesToFlush = HiveConf.getFloatVar(hconf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT);
+      this.checkInterval = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL);
+      this.maxHtEntries = HiveConf.getIntVar(hconf,
+          HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES);
+    }
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -226,8 +243,21 @@ public class VectorGroupByOperator exten
     processAggregators(batch);
 
     //Flush if memory limits were reached
-    if (shouldFlush(batch)) {
+    // We keep flushing until the memory is under threshold 
+    int preFlushEntriesCount = numEntriesHashTable;
+    while (shouldFlush(batch)) {
       flush(false);
+      
+      //Validate that some progress is being made
+      if (!(numEntriesHashTable < preFlushEntriesCount)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Flush did not progress: %d entries before, %d entries after",
+              preFlushEntriesCount,
+              numEntriesHashTable));
+        }
+        break;
+      }
+      preFlushEntriesCount = numEntriesHashTable;
     }
 
     if (sumBatchSize == 0 && 0 != batch.size) {
@@ -247,7 +277,7 @@ public class VectorGroupByOperator exten
   private void flush(boolean all) throws HiveException {
 
     int entriesToFlush = all ? numEntriesHashTable :
-      (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH);
+      (int)(numEntriesHashTable * this.percentEntriesToFlush);
     int entriesFlushed = 0;
 
     if (LOG.isDebugEnabled()) {
@@ -309,14 +339,18 @@ public class VectorGroupByOperator exten
    * Returns true if the memory threshold for the hash table was reached.
    */
   private boolean shouldFlush(VectorizedRowBatch batch) {
-    if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD ||
-        batch.size == 0) {
+    if (batch.size == 0) {
       return false;
     }
-    // Were going to update the average variable row size by sampling the current batch
-    updateAvgVariableSize(batch);
-    numEntriesSinceCheck = 0;
-    return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory;
+    //numEntriesSinceCheck is the number of entries added to the hash table
+    // since the last time we checked the average variable size
+    if (numEntriesSinceCheck >= this.checkInterval) {
+      // Were going to update the average variable row size by sampling the current batch
+      updateAvgVariableSize(batch);
+      numEntriesSinceCheck = 0;
+    }
+    return numEntriesHashTable > this.maxHtEntries ||
+        numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory;
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java?rev=1545564&r1=1545563&r2=1545564&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriter.java Tue Nov 26 08:19:25 2013
@@ -33,4 +33,6 @@ public interface VectorExpressionWriter 
   Object writeValue(long value) throws HiveException;
   Object writeValue(double value) throws HiveException;
   Object writeValue(byte[] value, int start, int length) throws HiveException;
+  Object setValue(Object row, ColumnVector column, int columnRow) throws HiveException;
+  Object initValue(Object ost) throws HiveException;
 }