You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2012/09/30 22:41:03 UTC

svn commit: r1392105 [1/7] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimiz...

Author: heyongqiang
Date: Sun Sep 30 20:41:01 2012
New Revision: 1392105

URL: http://svn.apache.org/viewvc?rev=1392105&view=rev
Log:
HIVE-2206:add a new optimizer for query correlation discovery and optimization (Yin Huai via He Yongqiang)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/CorrelationOptimizerUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q
    hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer1.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer2.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer3.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer4.q.out
    hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer5.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
    hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sun Sep 30 20:41:01 2012
@@ -495,6 +495,7 @@ public class HiveConf extends Configurat
     HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
     HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
     HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+    HIVEOPTCORRELATION("hive.optimize.correlation", false), // exploit intra-query correlations
 
     // optimize skewed join by changing the query plan at compile time
     HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME("hive.optimize.skewjoin.compiletime", false),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sun Sep 30 20:41:01 2012
@@ -928,6 +928,12 @@
 </property>
 
 <property>
+  <name>hive.optimize.correlation</name>
+  <value>false</value>
+  <description>exploit intra-query correlations.</description>
+</property>
+
+<property>
   <name>hive.exec.dynamic.partition</name>
   <value>true</value>
   <description>Whether or not to allow dynamic partitions in DML/DDL.</description>

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Sun Sep 30 20:41:01 2012
@@ -28,7 +28,10 @@ public enum OperatorType implements org.
   LATERALVIEWJOIN(14),
   LATERALVIEWFORWARD(15),
   HASHTABLESINK(16),
-  HASHTABLEDUMMY(17);
+  HASHTABLEDUMMY(17),
+  CORRELATIONCOMPOSITE(18),
+  CORRELATIONLOCALSIMULATIVEREDUCESINK(19),
+  CORRELATIONREDUCERDISPATCH(20);
 
   private final int value;
 
@@ -85,6 +88,12 @@ public enum OperatorType implements org.
         return HASHTABLESINK;
       case 17:
         return HASHTABLEDUMMY;
+      case 18:
+        return CORRELATIONCOMPOSITE;
+      case 19:
+        return CORRELATIONLOCALSIMULATIVEREDUCESINK;
+      case 20:
+        return CORRELATIONREDUCERDISPATCH;
       default:
         return null;
     }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BaseReduceSinkOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BaseReduceSinkOperator
+ **/
+public abstract class BaseReduceSinkOperator<T extends BaseReduceSinkDesc> extends
+  TerminalOperator<T> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  protected static final Log LOG = LogFactory.getLog(BaseReduceSinkOperator.class
+      .getName());
+
+  /**
+   * The evaluators for the key columns. Key columns decide the sort order on
+   * the reducer side. Key columns are passed to the reducer in the "key".
+   */
+  protected transient ExprNodeEvaluator[] keyEval;
+  /**
+   * The evaluators for the value columns. Value columns are passed to reducer
+   * in the "value".
+   */
+  protected transient ExprNodeEvaluator[] valueEval;
+  /**
+   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+   * Hive language). Partition columns decide the reducer that the current row
+   * goes to. Partition columns are not passed to reducer.
+   */
+  protected transient ExprNodeEvaluator[] partitionEval;
+
+  // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
+  // ready
+  protected transient Serializer keySerializer;
+  protected transient boolean keyIsText;
+  protected transient Serializer valueSerializer;
+  protected transient int tag;
+  protected transient byte[] tagByte = new byte[1];
+  protected transient int numDistributionKeys;
+  protected transient int numDistinctExprs;
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+
+    try {
+      keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+      int i = 0;
+      for (ExprNodeDesc e : conf.getKeyCols()) {
+        keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+      }
+
+      numDistributionKeys = conf.getNumDistributionKeys();
+      distinctColIndices = conf.getDistinctColumnIndices();
+      numDistinctExprs = distinctColIndices.size();
+
+      valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+      i = 0;
+      for (ExprNodeDesc e : conf.getValueCols()) {
+        valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+      }
+
+      partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+      i = 0;
+      for (ExprNodeDesc e : conf.getPartitionCols()) {
+        partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+      }
+
+      tag = conf.getTag();
+      tagByte[0] = (byte) tag;
+      LOG.info("Using tag = " + tag);
+
+      TableDesc keyTableDesc = conf.getKeySerializeInfo();
+      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+          .newInstance();
+      keySerializer.initialize(null, keyTableDesc.getProperties());
+      keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+      TableDesc valueTableDesc = conf.getValueSerializeInfo();
+      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+          .newInstance();
+      valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+      isFirstRow = true;
+      initializeChildren(hconf);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected transient InspectableObject tempInspectableObject = new InspectableObject();
+  protected transient HiveKey keyWritable = new HiveKey();
+  protected transient Writable value;
+
+  protected transient StructObjectInspector keyObjectInspector;
+  protected transient StructObjectInspector valueObjectInspector;
+  protected transient ObjectInspector[] partitionObjectInspectors;
+
+  protected transient Object[][] cachedKeys;
+  protected transient Object[] cachedValues;
+  protected transient List<List<Integer>> distinctColIndices;
+
+  protected boolean isFirstRow;
+
+  protected transient Random random;
+
+  /**
+   * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
+   * column indices for group by.
+   * Puts the return values into a StructObjectInspector with output column
+   * names.
+   *
+   * If distinctColIndices is empty, the object inspector is same as
+   * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
+   */
+  protected static StructObjectInspector initEvaluatorsAndReturnStruct(
+      ExprNodeEvaluator[] evals, List<List<Integer>> distinctColIndices,
+      List<String> outputColNames,
+      int length, ObjectInspector rowInspector)
+      throws HiveException {
+    int inspectorLen = evals.length > length ? length + 1 : evals.length;
+    List<ObjectInspector> sois = new ArrayList<ObjectInspector>(inspectorLen);
+
+    // keys
+    ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
+    sois.addAll(Arrays.asList(fieldObjectInspectors));
+
+    if (evals.length > length) {
+      // union keys
+      List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
+      for (List<Integer> distinctCols : distinctColIndices) {
+        List<String> names = new ArrayList<String>();
+        List<ObjectInspector> eois = new ArrayList<ObjectInspector>();
+        int numExprs = 0;
+        for (int i : distinctCols) {
+          names.add(HiveConf.getColumnInternalName(numExprs));
+          eois.add(evals[i].initialize(rowInspector));
+          numExprs++;
+        }
+        uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
+      }
+      UnionObjectInspector uoi =
+          ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
+      sois.add(uoi);
+    }
+    return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois);
+  }
+
+  @Override
+  public abstract void processOp(Object row, int tag) throws HiveException;
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return "BaseReduceSink";
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationCompositeOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Correlation composite operator implementation. This operator is used only in map phase for
+ * sharing table scan. Suppose that there are multiple operation paths (e.g. two different
+ * predicates on a table ) that share a common table. A row will be processed by these operation
+ * paths. To tag which operation paths actually forward this row, CorrelationCompositeOperator is
+ * used. For a row, this operator will buffer forwarded rows from its parents and then tag this row
+ * with a operation path tag indicating which paths forwarded this row. Right now, since operation
+ * path tag used in ReduceSinkOperator has 1 byte, this operator can have at most 8 parents
+ * (operation paths). For example, suppose that the common table is T and predicates P1 and P2 will
+ * be used in sub-queries SQ1 and SQ2, respectively. The CorrelationCompositeOperator
+ * will apply P1 and P2 on the row and tag the record based on if P1 or P2 is true.
+ **/
+public class CorrelationCompositeOperator extends Operator<CorrelationCompositeDesc> implements
+  Serializable {
+
+  public static enum Counter {
+    FORWARDED
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  private ReduceSinkOperator correspondingReduceSinkOperators;
+
+  private transient final LongWritable forwarded_count;
+
+  private transient boolean isFirstRow;
+
+  private int[] allOperationPathTags;
+
+  private Object[] rowBuffer; // buffer the output from multiple parents
+
+  public CorrelationCompositeOperator() {
+    super();
+    forwarded_count = new LongWritable();
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    isFirstRow = true;
+    rowBuffer = new Object[parentOperators.size()];
+    correspondingReduceSinkOperators = conf.getCorrespondingReduceSinkOperator();
+    allOperationPathTags = conf.getAllOperationPathTags();
+    statsMap.put(Counter.FORWARDED, forwarded_count);
+    outputObjInspector =
+        ObjectInspectorUtils.getStandardObjectInspector(outputObjInspector,
+            ObjectInspectorCopyOption.JAVA);
+
+    // initialize its children
+    initializeChildren(hconf);
+  }
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    rowBuffer[tag] =
+        ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[tag],
+            ObjectInspectorCopyOption.JAVA);
+  }
+
+  private void evaluateBuffer() throws HiveException {
+    List<Integer> operationPathTags = new ArrayList<Integer>();
+    boolean isForward = false;
+    Object forwardedRow = null;
+    for (int i = 0; i < rowBuffer.length; i++) {
+      if (rowBuffer[i] != null) {
+        isForward = true;
+        operationPathTags.add(allOperationPathTags[i]);
+        if (forwardedRow == null) {
+          forwardedRow = rowBuffer[i];
+        }
+      }
+    }
+    if (isForward) {
+      assert correspondingReduceSinkOperators != null;
+      correspondingReduceSinkOperators.setOperationPathTags(operationPathTags);
+      forwarded_count.set(forwarded_count.get() + 1);
+      forward(forwardedRow, null);
+    }
+    for (int i = 0; i < rowBuffer.length; i++) {
+      rowBuffer[i] = null;
+    }
+  }
+
+  @Override
+  public void setRowNumber(long rowNumber) throws HiveException {
+    this.rowNumber = rowNumber;
+    if (childOperators == null) {
+      return;
+    }
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      assert rowNumber >= childOperatorsArray[i].getRowNumber();
+      if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+        childOperatorsArray[i].setRowNumber(rowNumber);
+      }
+    }
+    if (isFirstRow) {
+      for (int i = 0; i < rowBuffer.length; i++) {
+        rowBuffer[i] = null;
+      }
+      isFirstRow = false;
+    } else {
+      evaluateBuffer();
+    }
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    if (!abort) {
+      evaluateBuffer();
+    }
+  }
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "CCO";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.CORRELATIONCOMPOSITE;
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationLocalSimulativeReduceSinkOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.eclipse.jdt.core.dom.ThisExpression;
+
+/**
+ * CorrelationLocalSimulativeReduceSinkOperator simulates a ReduceSinkOperator and sends output to
+ * another operator (JOIN or GBY). CorrelationLocalSimulativeReduceSinkOperator is used only in
+ * reduce phase. Basically, it is a bridge from one JOIN or GBY operator to another JOIN or GBY
+ * operator. A CorrelationLocalSimulativeReduceSinkOperator will take care actions of startGroup and
+ * endGroup of its succeeding JOIN or GBY operator.
+ * Example: A query involves a JOIN operator and a GBY operator and the GBY operator consume the
+ * output of the JOIN operator. In this case, if join keys and group by keys are the same, we do not
+ * need to shuffle the data again, since data has been already partitioned by the JOIN operator.
+ * Thus, in CorrelationOptimizer, the ReduceSinkOperator between JOIN and GBY operator will be
+ * replaced by a CorrelationLocalSimulativeReduceSinkOperator and the JOIN operator and GBY operator
+ * will be executed in a single reduce phase.
+ **/
+public class CorrelationLocalSimulativeReduceSinkOperator
+  extends BaseReduceSinkOperator<CorrelationLocalSimulativeReduceSinkDesc> {
+
+  private static final long serialVersionUID = 1L;
+  protected static final Log LOG = LogFactory.getLog(
+      CorrelationLocalSimulativeReduceSinkOperator.class.getName());
+
+  private transient TableDesc keyTableDesc;
+
+  private transient Deserializer inputKeyDeserializer;
+
+  private transient SerDe inputValueDeserializer;
+
+  private transient ByteWritable tagWritable;
+
+  private transient ObjectInspector outputKeyObjectInspector;
+  private transient ObjectInspector outputValueObjectInspector;
+
+  private List<Object> forwardedRow;
+  private Object keyObject;
+  private Object valueObject;
+
+  private BytesWritable groupKey;
+
+  private static String[] fieldNames;
+
+  static {
+    List<String> fieldNameArray = new ArrayList<String>();
+    for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+      fieldNameArray.add(r.toString());
+    }
+    fieldNames = fieldNameArray.toArray(new String[0]);
+  }
+
+  public CorrelationLocalSimulativeReduceSinkOperator() {
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    forwardedRow = new ArrayList<Object>(3);
+    tagByte = new byte[1];
+    tagWritable = new ByteWritable();
+    tempInspectableObject = new InspectableObject();
+    keyWritable = new HiveKey();
+    assert childOperatorsArray.length == 1;
+    try {
+      keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
+      int i = 0;
+      for (ExprNodeDesc e : conf.getKeyCols()) {
+        keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+      }
+
+      numDistributionKeys = conf.getNumDistributionKeys();
+      distinctColIndices = conf.getDistinctColumnIndices();
+      numDistinctExprs = distinctColIndices.size();
+
+      valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
+      i = 0;
+      for (ExprNodeDesc e : conf.getValueCols()) {
+        valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+      }
+
+      tag = conf.getTag();
+      tagByte[0] = (byte) tag;
+      tagWritable.set(tagByte[0]);
+      LOG.info("Using tag = " + tag);
+
+      TableDesc keyTableDesc = conf.getKeySerializeInfo();
+      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+          .newInstance();
+      keySerializer.initialize(null, keyTableDesc.getProperties());
+      keyIsText = keySerializer.getSerializedClass().equals(Text.class);
+
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+          .getDeserializerClass(), null);
+      inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+      outputKeyObjectInspector = inputKeyDeserializer.getObjectInspector();
+
+      TableDesc valueTableDesc = conf.getValueSerializeInfo();
+      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+          .newInstance();
+      valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+      inputValueDeserializer = (SerDe) ReflectionUtils.newInstance(
+          valueTableDesc.getDeserializerClass(), null);
+      inputValueDeserializer.initialize(null, valueTableDesc
+          .getProperties());
+      outputValueObjectInspector = inputValueDeserializer.getObjectInspector();
+
+      ObjectInspector rowInspector = inputObjInspectors[0];
+
+      keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
+          distinctColIndices,
+          conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
+      valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
+          .getOutputValueColumnNames(), rowInspector);
+      int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
+      int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
+        numDistributionKeys;
+      cachedKeys = new Object[numKeys][keyLen];
+      cachedValues = new Object[valueEval.length];
+      assert cachedKeys.length == 1;
+
+      List<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+      ois.add(outputKeyObjectInspector);
+      ois.add(outputValueObjectInspector);
+      ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+
+      outputObjInspector = ObjectInspectorFactory
+          .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+
+      LOG.info("Simulative ReduceSink inputObjInspectors"
+          + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+
+      LOG.info("Simulative ReduceSink outputObjInspectors "
+          + this.getChildOperators().get(0).getParentOperators().indexOf(this) +
+          " " + ((StructObjectInspector) outputObjInspector).getTypeName());
+
+      initializeChildren(hconf);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    try {
+      // Evaluate the value
+      for (int i = 0; i < valueEval.length; i++) {
+        cachedValues[i] = valueEval[i].evaluate(row);
+      }
+      // Serialize the value
+      value = valueSerializer.serialize(cachedValues, valueObjectInspector);
+      valueObject = inputValueDeserializer.deserialize(value);
+
+      // Evaluate the keys
+      Object[] distributionKeys = new Object[numDistributionKeys];
+      for (int i = 0; i < numDistributionKeys; i++) {
+        distributionKeys[i] = keyEval[i].evaluate(row);
+      }
+
+      if (numDistinctExprs > 0) {
+        // with distinct key(s)
+        for (int i = 0; i < numDistinctExprs; i++) {
+          System.arraycopy(distributionKeys, 0, cachedKeys[i], 0, numDistributionKeys);
+          Object[] distinctParameters =
+              new Object[distinctColIndices.get(i).size()];
+          for (int j = 0; j < distinctParameters.length; j++) {
+            distinctParameters[j] =
+                keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
+          }
+          cachedKeys[i][numDistributionKeys] =
+              new StandardUnion((byte) i, distinctParameters);
+        }
+      } else {
+        // no distinct key
+        System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
+      }
+
+      for (int i = 0; i < cachedKeys.length; i++) {
+        if (keyIsText) {
+          Text key = (Text) keySerializer.serialize(cachedKeys[i],
+              keyObjectInspector);
+          keyWritable.set(key.getBytes(), 0, key.getLength());
+        } else {
+          // Must be BytesWritable
+          BytesWritable key = (BytesWritable) keySerializer.serialize(
+              cachedKeys[i], keyObjectInspector);
+          keyWritable.set(key.getBytes(), 0, key.getLength());
+        }
+
+        if (!keyWritable.equals(groupKey)) {
+          try {
+            keyObject = inputKeyDeserializer.deserialize(keyWritable);
+          } catch (Exception e) {
+            throw new HiveException(
+                "Hive Runtime Error: Unable to deserialize reduce input key from "
+                    + Utilities.formatBinaryString(keyWritable.get(), 0,
+                        keyWritable.getSize()) + " with properties "
+                        + keyTableDesc.getProperties(), e);
+          }
+          if (groupKey == null) { // the first group
+            groupKey = new BytesWritable();
+          } else {
+            // if its child has not been ended, end it
+            if (!keyWritable.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+              childOperatorsArray[0].endGroup();
+            }
+          }
+          groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+          if (!groupKey.equals(childOperatorsArray[0].getBytesWritableGroupKey())) {
+            childOperatorsArray[0].startGroup();
+            childOperatorsArray[0].setGroupKeyObject(keyObject);
+            childOperatorsArray[0].setBytesWritableGroupKey(groupKey);
+          }
+        }
+        forwardedRow.clear();
+        forwardedRow.add(keyObject);
+        forwardedRow.add(valueObject);
+        forwardedRow.add(tagWritable);
+        forward(forwardedRow, outputObjInspector);
+      }
+    } catch (SerDeException e) {
+      throw new HiveException(e);
+    }
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    if (!abort) {
+      Operator<? extends OperatorDesc> child = childOperatorsArray[0];
+      if (child.allInitializedParentsAreClosed()) {
+        LOG.info("All parents of " + child.getName() + " (id: " + child.getIdentifier() +
+            ") has been closed. Invoke its endGroup");
+        childOperatorsArray[0].endGroup();
+      }
+    }
+  }
+
+  @Override
+  public void startGroup() throws HiveException {
+    // do nothing
+  }
+
+  @Override
+  public void endGroup() throws HiveException {
+    // do nothing
+  }
+
+  @Override
+  public void setGroupKeyObject(Object keyObject) {
+    // do nothing
+  }
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "CLSReduceSink";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.CORRELATIONLOCALSIMULATIVEREDUCESINK;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CorrelationReducerDispatchOperator.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+/**
+ * CorrelationReducerDispatchOperator is an operator used by MapReduce join optimized by
+ * CorrelationOptimizer. If used, CorrelationReducerDispatchOperator is the first operator in reduce
+ * phase. In the case that multiple operation paths are merged into a single one, it will dispatch
+ * the record to corresponding JOIN or GBY operators. Every child of this operator is associated
+ * with a DispatcherHnadler, which evaluates the input row of this operator and then select
+ * corresponding fields for its associated child.
+ */
+public class CorrelationReducerDispatchOperator extends Operator<CorrelationReducerDispatchDesc>
+  implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static String[] fieldNames;
+  static {
+    List<String> fieldNameArray = new ArrayList<String>();
+    for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
+      fieldNameArray.add(r.toString());
+    }
+    fieldNames = fieldNameArray.toArray(new String[0]);
+  }
+
+  protected static class DispatchHandler {
+
+    protected Log l4j = LogFactory.getLog(this.getClass().getName());
+
+    private final ObjectInspector[] inputObjInspector;
+    private ObjectInspector outputObjInspector;
+    private ObjectInspector keyObjInspector;
+    private ObjectInspector valueObjInspector;
+    private final byte inputTag;
+    private final byte outputTag;
+    private final byte childIndx;
+    private final ByteWritable outputTagByteWritable;
+    private final SelectDesc keySelectDesc;
+    private final SelectDesc valueSelectDesc;
+    private ExprNodeEvaluator[] keyEval;
+    private ExprNodeEvaluator[] eval;
+
+    // counters for debugging
+    private transient long cntr = 0;
+    private transient long nextCntr = 1;
+
+    private long getNextCntr(long cntr) {
+      // A very simple counter to keep track of number of rows processed by an
+      // operator. It dumps
+      // every 1 million times, and quickly before that
+      if (cntr >= 1000000) {
+        return cntr + 1000000;
+      }
+      return 10 * cntr;
+    }
+
+    public long getCntr() {
+      return this.cntr;
+    }
+
+    private final Log LOG;
+    private final boolean isLogInfoEnabled;
+    private final String id;
+
+    public DispatchHandler(ObjectInspector[] inputObjInspector, byte inputTag, byte childIndx,
+        byte outputTag,
+        SelectDesc valueSelectDesc, SelectDesc keySelectDesc, Log LOG, String id)
+            throws HiveException {
+      this.inputObjInspector = inputObjInspector;
+      assert this.inputObjInspector.length == 1;
+      this.inputTag = inputTag;
+      this.childIndx = childIndx;
+      this.outputTag = outputTag;
+      this.valueSelectDesc = valueSelectDesc;
+      this.keySelectDesc = keySelectDesc;
+      this.outputTagByteWritable = new ByteWritable(outputTag);
+      this.LOG = LOG;
+      this.isLogInfoEnabled = LOG.isInfoEnabled();
+      this.id = id;
+      init();
+    }
+
+    private void init() throws HiveException {
+      List<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+      if (keySelectDesc.isSelStarNoCompute()) {
+        ois.add((ObjectInspector) ((List) inputObjInspector[0]).get(0));
+      } else {
+        List<ExprNodeDesc> colList = this.keySelectDesc.getColList();
+        keyEval = new ExprNodeEvaluator[colList.size()];
+        for (int k = 0; k < colList.size(); k++) {
+          assert (colList.get(k) != null);
+          keyEval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+        }
+        keyObjInspector =
+            initEvaluatorsAndReturnStruct(keyEval, keySelectDesc
+                .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+                .getAllStructFieldRefs().get(0).getFieldObjectInspector());
+
+        ois.add(keyObjInspector);
+        l4j.info("Key: input tag " + (int) inputTag + ", output tag " + (int) outputTag
+            + ", SELECT inputOIForThisTag"
+            + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+      }
+      if (valueSelectDesc.isSelStarNoCompute()) {
+        ois.add((ObjectInspector) ((List) inputObjInspector[0]).get(1));
+      } else {
+        List<ExprNodeDesc> colList = this.valueSelectDesc.getColList();
+        eval = new ExprNodeEvaluator[colList.size()];
+        for (int k = 0; k < colList.size(); k++) {
+          assert (colList.get(k) != null);
+          eval[k] = ExprNodeEvaluatorFactory.get(colList.get(k));
+        }
+        valueObjInspector =
+            initEvaluatorsAndReturnStruct(eval, valueSelectDesc
+                .getOutputColumnNames(), ((StandardStructObjectInspector) inputObjInspector[0])
+                .getAllStructFieldRefs().get(1).getFieldObjectInspector());
+
+        ois.add(valueObjInspector);
+        l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+            + ", SELECT inputOIForThisTag"
+            + ((StructObjectInspector) inputObjInspector[0]).getTypeName());
+      }
+      ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
+      outputObjInspector = ObjectInspectorFactory
+          .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
+      l4j.info("input tag " + (int) inputTag + ", output tag " + (int) outputTag
+          + ", SELECT outputObjInspector"
+          + ((StructObjectInspector) outputObjInspector).getTypeName());
+    }
+
+    public ObjectInspector getOutputObjInspector() {
+      return outputObjInspector;
+    }
+
+    public Object process(Object row) throws HiveException {
+      List<Object> keyOutput = new ArrayList<Object>(keyEval.length);
+      Object[] valueOutput = new Object[eval.length];
+      List<Object> outputRow = new ArrayList<Object>(3);
+      List thisRow = (List) row;
+      if (keySelectDesc.isSelStarNoCompute()) {
+        outputRow.add(thisRow.get(0));
+      } else {
+        Object key = thisRow.get(0);
+        for (int j = 0; j < keyEval.length; j++) {
+          try {
+            keyOutput.add(keyEval[j].evaluate(key));
+          } catch (HiveException e) {
+            throw e;
+          } catch (RuntimeException e) {
+            throw new HiveException("Error evaluating "
+                + keySelectDesc.getColList().get(j).getExprString(), e);
+          }
+        }
+        outputRow.add(keyOutput);
+      }
+
+      if (valueSelectDesc.isSelStarNoCompute()) {
+        outputRow.add(thisRow.get(1));
+      } else {
+        Object value = thisRow.get(1);
+        for (int j = 0; j < eval.length; j++) {
+          try {
+            valueOutput[j] = eval[j].evaluate(value);
+          } catch (HiveException e) {
+            throw e;
+          } catch (RuntimeException e) {
+            throw new HiveException("Error evaluating "
+                + valueSelectDesc.getColList().get(j).getExprString(), e);
+          }
+        }
+        outputRow.add(valueOutput);
+      }
+      outputRow.add(outputTagByteWritable);
+
+      if (isLogInfoEnabled) {
+        cntr++;
+        if (cntr == nextCntr) {
+          LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+              + outputTag + "), forwarding " + cntr + " rows");
+          nextCntr = getNextCntr(cntr);
+        }
+      }
+
+      return outputRow;
+    }
+
+    public void printCloseOpLog() {
+      LOG.info(id + "(inputTag, childIndx, outputTag)=(" + inputTag + ", " + childIndx + ", "
+          + outputTag + "),  forwarded " + cntr + " rows");
+    }
+  }
+
+  // inputTag->(Child->List<outputTag>)
+  private Map<Integer, Map<Integer, List<Integer>>> dispatchConf;
+  // inputTag->(Child->List<SelectDesc>)
+  private Map<Integer, Map<Integer, List<SelectDesc>>> dispatchValueSelectDescConf;
+  // inputTag->(Child->List<SelectDesc>)
+  private Map<Integer, Map<Integer, List<SelectDesc>>> dispatchKeySelectDescConf;
+  // inputTag->(Child->List<DispatchHandler>)
+  private Map<Integer, Map<Integer, List<DispatchHandler>>> dispatchHandlers;
+  // Child->(outputTag->DispatchHandler)
+  private Map<Integer, Map<Integer, DispatchHandler>> child2OutputTag2DispatchHandlers;
+  // Child->Child's inputObjInspectors
+  private Map<Integer, ObjectInspector[]> childInputObjInspectors;
+
+  private int operationPathTag;
+  private int inputTag;
+
+  private Object[] lastDispatchedRows;
+  private int[] lastDispatchedTags;
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    dispatchConf = conf.getDispatchConf();
+    dispatchValueSelectDescConf = conf.getDispatchValueSelectDescConf();
+    dispatchKeySelectDescConf = conf.getDispatchKeySelectDescConf();
+    dispatchHandlers = new HashMap<Integer, Map<Integer, List<DispatchHandler>>>();
+    for (Entry<Integer, Map<Integer, List<Integer>>> entry : dispatchConf.entrySet()) {
+      Map<Integer, List<DispatchHandler>> tmp =
+          new HashMap<Integer, List<DispatchHandler>>();
+      for (Entry<Integer, List<Integer>> child2outputTag : entry.getValue().entrySet()) {
+        tmp.put(child2outputTag.getKey(), new ArrayList<DispatchHandler>());
+        int indx = 0;
+        for (Integer outputTag : child2outputTag.getValue()) {
+          ObjectInspector[] thisInputObjectInspector =
+              new ObjectInspector[] {inputObjInspectors[entry.getKey()]};
+          Integer thisInputTag = entry.getKey();
+          Integer thisChildIndx = child2outputTag.getKey();
+          SelectDesc thisValueSelectDesc = dispatchValueSelectDescConf.get(thisInputTag)
+              .get(thisChildIndx).get(indx);
+          SelectDesc thisKeySelectDesc = dispatchKeySelectDescConf.get(thisInputTag)
+              .get(thisChildIndx).get(indx);
+          tmp.get(child2outputTag.getKey()).add(
+              new DispatchHandler(thisInputObjectInspector,
+                  thisInputTag.byteValue(), thisChildIndx.byteValue(), outputTag.byteValue(),
+                  thisValueSelectDesc, thisKeySelectDesc, LOG, id));
+          indx++;
+        }
+      }
+      dispatchHandlers.put(entry.getKey(), tmp);
+    }
+
+    child2OutputTag2DispatchHandlers = new HashMap<Integer, Map<Integer, DispatchHandler>>();
+    for (Entry<Integer, Map<Integer, List<Integer>>> entry : dispatchConf.entrySet()) {
+      for (Entry<Integer, List<Integer>> child2outputTag : entry.getValue().entrySet()) {
+        if (!child2OutputTag2DispatchHandlers.containsKey(child2outputTag.getKey())) {
+          child2OutputTag2DispatchHandlers.put(child2outputTag.getKey(),
+              new HashMap<Integer, DispatchHandler>());
+        }
+        int indx = 0;
+        for (Integer outputTag : child2outputTag.getValue()) {
+          child2OutputTag2DispatchHandlers.get(child2outputTag.getKey()).
+            put(outputTag,
+              dispatchHandlers.get(entry.getKey()).get(child2outputTag.getKey()).get(indx));
+          indx++;
+        }
+      }
+    }
+
+    childInputObjInspectors = new HashMap<Integer, ObjectInspector[]>();
+    for (Entry<Integer, Map<Integer, DispatchHandler>> entry : child2OutputTag2DispatchHandlers
+        .entrySet()) {
+      Integer l = Collections.max(entry.getValue().keySet());
+      ObjectInspector[] childObjInspectors = new ObjectInspector[l.intValue() + 1];
+      for (Entry<Integer, DispatchHandler> e : entry.getValue().entrySet()) {
+        if (e.getKey().intValue() == -1) {
+          assert childObjInspectors.length == 1;
+          childObjInspectors[0] = e.getValue().getOutputObjInspector();
+        } else {
+          childObjInspectors[e.getKey().intValue()] = e.getValue().getOutputObjInspector();
+        }
+      }
+      childInputObjInspectors.put(entry.getKey(), childObjInspectors);
+    }
+
+    lastDispatchedRows = new Object[childOperatorsArray.length];
+    lastDispatchedTags = new int[childOperatorsArray.length];
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      lastDispatchedRows[i] = null;
+      lastDispatchedTags[i] = -1;
+    }
+
+    initializeChildren(hconf);
+  }
+
+  // Each child should has its own outputObjInspector
+  @Override
+  protected void initializeChildren(Configuration hconf) throws HiveException {
+    state = State.INIT;
+    LOG.info("Operator " + id + " " + getName() + " initialized");
+    if (childOperators == null) {
+      return;
+    }
+    LOG.info("Initializing children of " + id + " " + getName());
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      LOG.info("Initializing child " + i + " " + childOperatorsArray[i].getIdentifier() + " " +
+          childOperatorsArray[i].getName() +
+          " " + childInputObjInspectors.get(i).length);
+      childOperatorsArray[i].initialize(hconf, childInputObjInspectors.get(i));
+      if (reporter != null) {
+        childOperatorsArray[i].setReporter(reporter);
+      }
+    }
+  }
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    List<Object> thisRow = (List<Object>) row;
+    assert thisRow.size() == 4;
+    operationPathTag = ((ByteWritable) thisRow.get(3)).get();
+    inputTag = ((ByteWritable) thisRow.get(2)).get();
+    forward(thisRow.subList(0, 3), inputObjInspectors[inputTag]);
+  }
+
+  @Override
+  public void forward(Object row, ObjectInspector rowInspector)
+      throws HiveException {
+    if ((++outputRows % 1000) == 0) {
+      if (counterNameToEnum != null) {
+        incrCounter(numOutputRowsCntr, outputRows);
+        outputRows = 0;
+      }
+    }
+
+    if (childOperatorsArray == null && childOperators != null) {
+      throw new HiveException("Internal Hive error during operator initialization.");
+    }
+
+    if ((childOperatorsArray == null) || (getDone())) {
+      return;
+    }
+
+    int childrenDone = 0;
+    int forwardFlag = 1;
+    assert childOperatorsArray.length <= 8;
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      Operator<? extends OperatorDesc> o = childOperatorsArray[i];
+      if (o.getDone()) {
+        childrenDone++;
+      } else {
+        int isProcess = (operationPathTag & (forwardFlag << i));
+        if (isProcess != 0) {
+          if (o.getName().equals(GroupByOperator.getOperatorName())) {
+            GroupByOperator gbyop = (GroupByOperator) o;
+            gbyop.setForcedForward(false);
+            if (!this.bytesWritableGroupKey.equals(o.getBytesWritableGroupKey())) {
+              o.setBytesWritableGroupKey(this.bytesWritableGroupKey);
+            }
+          }
+          for (int j = 0; j < dispatchHandlers.get(inputTag).get(i).size(); j++) {
+            Object dispatchedRow = dispatchHandlers.get(inputTag).get(i).get(j).process(row);
+            int dispatchedTag = dispatchConf.get(inputTag).get(i).get(j);
+            o.process(dispatchedRow, dispatchedTag);
+            lastDispatchedRows[i] = dispatchedRow;
+            lastDispatchedTags[i] = dispatchedTag;
+          }
+        }
+        if (isProcess == 0 && o.getName().equals(GroupByOperator.getOperatorName())) {
+          if (lastDispatchedRows[i] != null &&
+              !this.bytesWritableGroupKey.equals(o.getBytesWritableGroupKey())) {
+            GroupByOperator gbyop = (GroupByOperator) o;
+            gbyop.setForcedForward(true);
+            o.setBytesWritableGroupKey(this.bytesWritableGroupKey);
+            o.process(lastDispatchedRows[i], lastDispatchedTags[i]);
+          }
+        }
+      }
+    }
+
+    // if all children are done, this operator is also done
+    if (childrenDone == childOperatorsArray.length) {
+      setDone(true);
+    }
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    // log the number of rows forwarded from each dispatcherHandler
+    for (Map<Integer, List<DispatchHandler>> childIndx2DispatchHandlers : dispatchHandlers
+        .values()) {
+      for (List<DispatchHandler> dispatchHandlers : childIndx2DispatchHandlers.values()) {
+        for (DispatchHandler dispatchHandler : dispatchHandlers) {
+          dispatchHandler.printCloseOpLog();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void setGroupKeyObject(Object keyObject) {
+    this.groupKeyObject = keyObject;
+    for (Operator<? extends OperatorDesc> op : childOperators) {
+      op.setGroupKeyObject(keyObject);
+    }
+  }
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "CDP";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.CORRELATIONREDUCERDISPATCH;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Sun Sep 30 20:41:01 2012
@@ -25,6 +25,7 @@ import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +62,7 @@ public class ExecReducer extends MapRedu
   private Reporter rp;
   private boolean abort = false;
   private boolean isTagged = false;
+  private boolean isOperationPathTagged = false;
   private long cntr = 0;
   private long nextCntr = 1;
 
@@ -116,6 +118,7 @@ public class ExecReducer extends MapRedu
     reducer.setParentOperators(null); // clear out any parents as reducer is the
     // root
     isTagged = gWork.getNeedsTagging();
+    isOperationPathTagged = gWork.getNeedsOperationPathTagging();
     try {
       keyTableDesc = gWork.getKeyDesc();
       inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
@@ -164,8 +167,9 @@ public class ExecReducer extends MapRedu
 
   private BytesWritable groupKey;
 
-  ArrayList<Object> row = new ArrayList<Object>(3);
+  List<Object> row = new ArrayList<Object>(4);
   ByteWritable tag = new ByteWritable();
+  ByteWritable operationPathTags = new ByteWritable();
 
   public void reduce(Object key, Iterator values, OutputCollector output,
       Reporter reporter) throws IOException {
@@ -188,6 +192,14 @@ public class ExecReducer extends MapRedu
         keyWritable.setSize(size);
       }
 
+      operationPathTags.set((byte)0);
+      if (isOperationPathTagged) {
+        // remove the operation plan tag
+        int size = keyWritable.getSize() - 1;
+        operationPathTags.set(keyWritable.get()[size]);
+        keyWritable.setSize(size);
+      }
+
       if (!keyWritable.equals(groupKey)) {
         // If a operator wants to do some work at the beginning of a group
         if (groupKey == null) { // the first group
@@ -212,6 +224,7 @@ public class ExecReducer extends MapRedu
         l4j.trace("Start Group");
         reducer.startGroup();
         reducer.setGroupKeyObject(keyObject);
+        reducer.setBytesWritableGroupKey(groupKey);
       }
       // System.err.print(keyObject.toString());
       while (values.hasNext()) {
@@ -234,6 +247,7 @@ public class ExecReducer extends MapRedu
         row.add(valueObject[tag.get()]);
         // The tag is not used any more, we should remove it.
         row.add(tag);
+        row.add(operationPathTags);
         if (isLogInfoEnabled) {
           cntr++;
           if (cntr == nextCntr) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Sun Sep 30 20:41:01 2012
@@ -144,6 +144,13 @@ public class GroupByOperator extends Ope
   private long maxMemory;
   private float memoryThreshold;
 
+  private boolean forcedForward;  // only used by CorrelationReducerDispatchOperator to make
+                                     // GroupByOperator has the same pace with other
+                                     // GroupByOperators and JoinOperators.
+                                     // If true and newKeys is different from currentKeys,
+                                     // data associated with currentKeys will be
+                                     // forwarded, otherwise, nothing happens.
+
   /**
    * This is used to store the position and field names for variable length
    * fields.
@@ -385,6 +392,7 @@ public class GroupByOperator extends Ope
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
     memoryThreshold = this.getConf().getMemoryThreshold();
+    forcedForward = false;
     initializeChildren(hconf);
   }
 
@@ -793,6 +801,10 @@ public class GroupByOperator extends Ope
     }
   }
 
+  public void setForcedForward(boolean forcedForward) {
+    this.forcedForward = forcedForward;
+  }
+
   // Non-hash aggregation
   private void processAggr(Object row, ObjectInspector rowInspector,
       KeyWrapper newKeys) throws HiveException {
@@ -806,11 +818,16 @@ public class GroupByOperator extends Ope
         newKeys.equals(currentKeys) : false;
 
     // Forward the current keys if needed for sort-based aggregation
-    if (currentKeys != null && !keysAreEqual) {
+    if (currentKeys != null && (!keysAreEqual || forcedForward)) {
       forward(currentKeys.getKeyArray(), aggregations);
       countAfterReport = 0;
     }
 
+    if (forcedForward) {
+      currentKeys = null;
+      return;
+    }
+
     // Need to update the keys?
     if (currentKeys == null || !keysAreEqual) {
       if (currentKeys == null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sun Sep 30 20:41:01 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.ap
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -1415,4 +1416,52 @@ public abstract class Operator<T extends
   public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
     this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
   }
+
+  // bytesWritableGroupKey is only used when a query plan is optimized by CorrelationOptimizer.
+  // CorrelationLocalSimulativeReduceSinkOperator will use this variable to determine when it needs to start or end the group
+  // for its child operator.
+  protected BytesWritable bytesWritableGroupKey;
+
+  public void setBytesWritableGroupKey(BytesWritable groupKey) {
+    if (bytesWritableGroupKey == null) {
+      bytesWritableGroupKey = new BytesWritable();
+    }
+    bytesWritableGroupKey.set(groupKey.get(), 0, groupKey.getSize());
+  }
+
+  public BytesWritable getBytesWritableGroupKey() {
+    return bytesWritableGroupKey;
+  }
+
+  // The number of current row
+  protected long rowNumber;
+
+  public void initializeRowNumber() {
+    this.rowNumber = 0L;
+    LOG.info("Operator " + id + " " + getName() + " row number initialized to 0");
+    if (childOperators == null) {
+      return;
+    }
+    LOG.info("Initializing row numbers of children of " + id + " " + getName());
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      childOperatorsArray[i].initializeRowNumber();
+    }
+  }
+
+  public void setRowNumber(long rowNumber) throws HiveException {
+    this.rowNumber = rowNumber;
+    if (childOperators == null) {
+      return;
+    }
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      assert rowNumber >= childOperatorsArray[i].getRowNumber();
+      if (rowNumber != childOperatorsArray[i].getRowNumber()) {
+        childOperatorsArray[i].setRowNumber(rowNumber);
+      }
+    }
+  }
+
+  public long getRowNumber() {
+    return rowNumber;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Sun Sep 30 20:41:01 2012
@@ -22,6 +22,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationCompositeDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationLocalSimulativeReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.CorrelationReducerDispatchDesc;
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -91,6 +94,12 @@ public final class OperatorFactory {
         HashTableDummyOperator.class));
     opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
         HashTableSinkOperator.class));
+    opvec.add(new OpTuple<CorrelationCompositeDesc>(CorrelationCompositeDesc.class,
+            CorrelationCompositeOperator.class));
+    opvec.add(new OpTuple<CorrelationReducerDispatchDesc>(CorrelationReducerDispatchDesc.class,
+        CorrelationReducerDispatchOperator.class));
+    opvec.add(new OpTuple<CorrelationLocalSimulativeReduceSinkDesc>(CorrelationLocalSimulativeReduceSinkDesc.class,
+        CorrelationLocalSimulativeReduceSinkOperator.class));
   }
 
   public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sun Sep 30 20:41:01 2012
@@ -21,179 +21,50 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
-public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
+public class ReduceSinkOperator extends BaseReduceSinkOperator<ReduceSinkDesc>
     implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  /**
-   * The evaluators for the key columns. Key columns decide the sort order on
-   * the reducer side. Key columns are passed to the reducer in the "key".
-   */
-  protected transient ExprNodeEvaluator[] keyEval;
-  /**
-   * The evaluators for the value columns. Value columns are passed to reducer
-   * in the "value".
-   */
-  protected transient ExprNodeEvaluator[] valueEval;
-  /**
-   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
-   * Hive language). Partition columns decide the reducer that the current row
-   * goes to. Partition columns are not passed to reducer.
-   */
-  protected transient ExprNodeEvaluator[] partitionEval;
-
-  // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
-  // ready
-  transient Serializer keySerializer;
-  transient boolean keyIsText;
-  transient Serializer valueSerializer;
-  transient int tag;
-  transient byte[] tagByte = new byte[1];
-  transient protected int numDistributionKeys;
-  transient protected int numDistinctExprs;
-
-  @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-
-    try {
-      keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
-      int i = 0;
-      for (ExprNodeDesc e : conf.getKeyCols()) {
-        keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
-      }
-
-      numDistributionKeys = conf.getNumDistributionKeys();
-      distinctColIndices = conf.getDistinctColumnIndices();
-      numDistinctExprs = distinctColIndices.size();
-
-      valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
-      i = 0;
-      for (ExprNodeDesc e : conf.getValueCols()) {
-        valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
-      }
-
-      partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
-      i = 0;
-      for (ExprNodeDesc e : conf.getPartitionCols()) {
-        partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
-      }
+  private final List<Integer> operationPathTags = new ArrayList<Integer>(); // operation path tags
+  private final byte[] operationPathTagsByte = new byte[1];
 
-      tag = conf.getTag();
-      tagByte[0] = (byte) tag;
-      LOG.info("Using tag = " + tag);
-
-      TableDesc keyTableDesc = conf.getKeySerializeInfo();
-      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
-          .newInstance();
-      keySerializer.initialize(null, keyTableDesc.getProperties());
-      keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
-      TableDesc valueTableDesc = conf.getValueSerializeInfo();
-      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
-          .newInstance();
-      valueSerializer.initialize(null, valueTableDesc.getProperties());
-
-      firstRow = true;
-      initializeChildren(hconf);
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new RuntimeException(e);
+  public void setOperationPathTags(List<Integer> operationPathTags) {
+    this.operationPathTags.addAll(operationPathTags);
+    int operationPathTagsInt = 0;
+    int tmp = 1;
+    for (Integer operationPathTag: operationPathTags) {
+      operationPathTagsInt += tmp << operationPathTag.intValue();
     }
+    operationPathTagsByte[0] = (byte) operationPathTagsInt;
   }
 
-  transient InspectableObject tempInspectableObject = new InspectableObject();
-  transient HiveKey keyWritable = new HiveKey();
-  transient Writable value;
-
-  transient StructObjectInspector keyObjectInspector;
-  transient StructObjectInspector valueObjectInspector;
-  transient ObjectInspector[] partitionObjectInspectors;
-
-  transient Object[][] cachedKeys;
-  transient Object[] cachedValues;
-  transient List<List<Integer>> distinctColIndices;
-
-  boolean firstRow;
-
-  transient Random random;
-
-  /**
-   * Initializes array of ExprNodeEvaluator. Adds Union field for distinct
-   * column indices for group by.
-   * Puts the return values into a StructObjectInspector with output column
-   * names.
-   *
-   * If distinctColIndices is empty, the object inspector is same as
-   * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)}
-   */
-  protected static StructObjectInspector initEvaluatorsAndReturnStruct(
-      ExprNodeEvaluator[] evals, List<List<Integer>> distinctColIndices,
-      List<String> outputColNames,
-      int length, ObjectInspector rowInspector)
-      throws HiveException {
-    int inspectorLen = evals.length > length ? length + 1 : evals.length;
-    List<ObjectInspector> sois = new ArrayList<ObjectInspector>(inspectorLen);
-
-    // keys
-    ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector);
-    sois.addAll(Arrays.asList(fieldObjectInspectors));
-
-    if (evals.length > length) {
-      // union keys
-      List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
-      for (List<Integer> distinctCols : distinctColIndices) {
-        List<String> names = new ArrayList<String>();
-        List<ObjectInspector> eois = new ArrayList<ObjectInspector>();
-        int numExprs = 0;
-        for (int i : distinctCols) {
-          names.add(HiveConf.getColumnInternalName(numExprs));
-          eois.add(evals[i].initialize(rowInspector));
-          numExprs++;
-        }
-        uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois));
-      }
-      UnionObjectInspector uoi =
-        ObjectInspectorFactory.getStandardUnionObjectInspector(uois);
-      sois.add(uoi);
-    }
-    return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois );
+  public List<Integer> getOperationPathTags() {
+    return this.operationPathTags;
   }
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
-      if (firstRow) {
-        firstRow = false;
+      if (isFirstRow) {
+        isFirstRow = false;
         keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
             distinctColIndices,
             conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -267,9 +138,18 @@ public class ReduceSinkOperator extends 
             keyWritable.set(key.getBytes(), 0, key.getLength());
           } else {
             int keyLength = key.getLength();
-            keyWritable.setSize(keyLength + 1);
+            if (!this.getConf().getNeedsOperationPathTagging()) {
+              keyWritable.setSize(keyLength + 1);
+            } else {
+              keyWritable.setSize(keyLength + 2);
+            }
             System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-            keyWritable.get()[keyLength] = tagByte[0];
+            if (!this.getConf().getNeedsOperationPathTagging()) {
+              keyWritable.get()[keyLength] = tagByte[0];
+            } else {
+              keyWritable.get()[keyLength] = operationPathTagsByte[0];
+              keyWritable.get()[keyLength + 1] = tagByte[0];
+            }
           }
         } else {
           // Must be BytesWritable
@@ -279,9 +159,18 @@ public class ReduceSinkOperator extends 
             keyWritable.set(key.getBytes(), 0, key.getLength());
           } else {
             int keyLength = key.getLength();
-            keyWritable.setSize(keyLength + 1);
+            if (!this.getConf().getNeedsOperationPathTagging()) {
+              keyWritable.setSize(keyLength + 1);
+            } else {
+              keyWritable.setSize(keyLength + 2);
+            }
             System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-            keyWritable.get()[keyLength] = tagByte[0];
+            if (!this.getConf().getNeedsOperationPathTagging()) {
+              keyWritable.get()[keyLength] = tagByte[0];
+            } else {
+              keyWritable.get()[keyLength] = operationPathTagsByte[0];
+              keyWritable.get()[keyLength + 1] = tagByte[0];
+            }
           }
         }
         keyWritable.setHashCode(keyHashCode);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Sun Sep 30 20:41:01 2012
@@ -80,6 +80,9 @@ public class TableScanOperator extends O
     if (conf != null && conf.isGatherStats()) {
       gatherStats(row);
     }
+    if (conf != null && conf.isForwardRowNumber()) {
+      setRowNumber(rowNumber+1);
+    }
     forward(row, inputObjInspectors[tag]);
   }
 
@@ -169,6 +172,12 @@ public class TableScanOperator extends O
     if (conf == null) {
       return;
     }
+
+    LOG.info(this.getName() + " forward row number " + conf.isForwardRowNumber());
+    if(conf.isForwardRowNumber()){
+      initializeRowNumber();
+    }
+
     if (!conf.isGatherStats()) {
       return;
     }