You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/11/26 01:49:40 UTC

[hive] branch master updated: HIVE-22525: Refactor HiveOpConverter (Miklos Gergely reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new af9e9a9  HIVE-22525: Refactor HiveOpConverter (Miklos Gergely reviewed by Jesus Camacho Rodriguez)
af9e9a9 is described below

commit af9e9a98a551cb48cae607775777e32010157fd5
Author: miklosgergely <mg...@cloudera.com>
AuthorDate: Mon Nov 25 17:49:29 2019 -0800

    HIVE-22525: Refactor HiveOpConverter (Miklos Gergely reviewed by Jesus Camacho Rodriguez)
---
 .../calcite/translator/HiveOpConverter.java        | 1358 --------------------
 .../opconventer/HiveAggregateVisitor.java          |   35 +
 .../translator/opconventer/HiveFilterVisitor.java  |   65 +
 .../{ => opconventer}/HiveGBOpConvUtil.java        |   26 +-
 .../translator/opconventer/HiveOpConverter.java    |  150 +++
 .../opconventer/HiveOpConverterUtils.java          |  267 ++++
 .../translator/opconventer/HiveProjectVisitor.java |  213 +++
 .../translator/opconventer/HiveRelNodeVisitor.java |   40 +
 .../opconventer/HiveSortExchangeVisitor.java       |   74 ++
 .../opconventer/HiveSortLimitVisitor.java          |  154 +++
 .../opconventer/HiveTableFunctionScanVisitor.java  |  140 ++
 .../opconventer/HiveTableScanVisitor.java          |  125 ++
 .../translator/opconventer/HiveUnionVisitor.java   |  144 +++
 .../translator/opconventer/JoinVisitor.java        |  390 ++++++
 .../translator/opconventer/package-info.java       |   20 +
 .../hadoop/hive/ql/parse/CalcitePlanner.java       |    2 +-
 16 files changed, 1832 insertions(+), 1371 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
deleted file mode 100644
index b900e89..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
+++ /dev/null
@@ -1,1358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.optimizer.calcite.translator;
-
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelDistribution.Type;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.FunctionInfo;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.LimitOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveMultiJoin;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
-import org.apache.hadoop.hive.ql.parse.JoinCond;
-import org.apache.hadoop.hive.ql.parse.JoinType;
-import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec;
-import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
-import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
-import org.apache.hadoop.hive.ql.parse.PTFTranslator;
-import org.apache.hadoop.hive.ql.parse.ParseUtils;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.UnparseTranslator;
-import org.apache.hadoop.hive.ql.parse.WindowingComponentizer;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc;
-import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
-import org.apache.hadoop.hive.ql.plan.LimitDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PTFDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.SelectDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.UDTFDesc;
-import org.apache.hadoop.hive.ql.plan.UnionDesc;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-
-public class HiveOpConverter {
-
-  private static final Logger LOG = LoggerFactory.getLogger(HiveOpConverter.class);
-
-  public static enum HIVEAGGOPMODE {
-    NO_SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan1MR
-    SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan2MR
-    NO_SKEW_MAP_SIDE_AGG, // Corresponds to SemAnalyzer
-    // genGroupByPlanMapAggrNoSkew
-    SKEW_MAP_SIDE_AGG // Corresponds to SemAnalyzer genGroupByPlanMapAggr2MR
-  };
-
-  // TODO: remove this after stashing only rqd pieces from opconverter
-  private final SemanticAnalyzer                              semanticAnalyzer;
-  private final HiveConf                                      hiveConf;
-  private final UnparseTranslator                             unparseTranslator;
-  private final Map<String, TableScanOperator>                topOps;
-  private int                                                 uniqueCounter;
-
-  public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf,
-      UnparseTranslator unparseTranslator, Map<String, TableScanOperator> topOps) {
-    this.semanticAnalyzer = semanticAnalyzer;
-    this.hiveConf = hiveConf;
-    this.unparseTranslator = unparseTranslator;
-    this.topOps = topOps;
-    this.uniqueCounter = 0;
-  }
-
-  static class OpAttr {
-    final String                         tabAlias;
-    ImmutableList<Operator>              inputs;
-    ImmutableSet<Integer>                vcolsInCalcite;
-
-    OpAttr(String tabAlias, Set<Integer> vcols, Operator... inputs) {
-      this.tabAlias = tabAlias;
-      this.inputs = ImmutableList.copyOf(inputs);
-      this.vcolsInCalcite = ImmutableSet.copyOf(vcols);
-    }
-
-    private OpAttr clone(Operator... inputs) {
-      return new OpAttr(tabAlias, vcolsInCalcite, inputs);
-    }
-  }
-
-  private void handleTopLimit(Operator<?> rootOp) {
-    if (rootOp instanceof LimitOperator) {
-      // this can happen only on top most limit, not while visiting Limit Operator
-      // since that can be within subquery.
-      this.semanticAnalyzer.getQB().getParseInfo().setOuterQueryLimit(((LimitOperator) rootOp).getConf().getLimit());
-    }
-  }
-
-  public Operator convert(RelNode root) throws SemanticException {
-    OpAttr opAf = dispatch(root);
-    Operator rootOp = opAf.inputs.get(0);
-    handleTopLimit(rootOp);
-    return rootOp;
-  }
-
-  OpAttr dispatch(RelNode rn) throws SemanticException {
-    if (rn instanceof HiveTableScan) {
-      return visit((HiveTableScan) rn);
-    } else if (rn instanceof HiveProject) {
-      return visit((HiveProject) rn);
-    } else if (rn instanceof HiveMultiJoin) {
-      return visit((HiveMultiJoin) rn);
-    } else if (rn instanceof HiveJoin) {
-      return visit((HiveJoin) rn);
-    } else if (rn instanceof HiveSemiJoin) {
-      return visit((HiveSemiJoin) rn);
-    } else if (rn instanceof HiveFilter) {
-      return visit((HiveFilter) rn);
-    } else if (rn instanceof HiveSortLimit) {
-      return visit((HiveSortLimit) rn);
-    } else if (rn instanceof HiveUnion) {
-      return visit((HiveUnion) rn);
-    } else if (rn instanceof HiveSortExchange) {
-      return visit((HiveSortExchange) rn);
-    } else if (rn instanceof HiveAggregate) {
-      return visit((HiveAggregate) rn);
-    } else if (rn instanceof HiveTableFunctionScan) {
-      return visit((HiveTableFunctionScan) rn);
-    }
-    LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported"
-        + " yet in return path.");
-    return null;
-  }
-
-  private OpAttr visit(HiveTableFunctionScan scanRel) throws SemanticException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + scanRel.getId() + ":"
-          + scanRel.getRelTypeName() + " with row type: [" + scanRel.getRowType() + "]");
-    }
-
-    RexCall call = (RexCall)scanRel.getCall();
-
-    RowResolver rowResolver = new RowResolver();
-    List<String> fieldNames = new ArrayList<>(scanRel.getRowType().getFieldNames());
-    List<String> exprNames = new ArrayList<>(fieldNames);
-    List<ExprNodeDesc> exprCols = new ArrayList<>();
-    Map<String, ExprNodeDesc> colExprMap = new HashMap<>();
-    for (int pos = 0; pos < call.getOperands().size(); pos++) {
-      ExprNodeConverter converter = new ExprNodeConverter(SemanticAnalyzer.DUMMY_TABLE, fieldNames.get(pos),
-          scanRel.getRowType(), scanRel.getRowType(), ((HiveTableScan)scanRel.getInput(0)).getPartOrVirtualCols(),
-          scanRel.getCluster().getTypeFactory(), true);
-      ExprNodeDesc exprCol = call.getOperands().get(pos).accept(converter);
-      colExprMap.put(exprNames.get(pos), exprCol);
-      exprCols.add(exprCol);
-
-      ColumnInfo columnInfo = new ColumnInfo(fieldNames.get(pos), exprCol.getWritableObjectInspector(), null, false);
-      rowResolver.put(columnInfo.getTabAlias(), columnInfo.getAlias(), columnInfo);
-    }
-
-    OpAttr inputOpAf = dispatch(scanRel.getInputs().get(0));
-    TableScanOperator op = (TableScanOperator)inputOpAf.inputs.get(0);
-    op.getConf().setRowLimit(1);
-
-    Operator<?> output = OperatorFactory.getAndMakeChild(new SelectDesc(exprCols, fieldNames, false),
-        new RowSchema(rowResolver.getRowSchema()), op);
-    output.setColumnExprMap(colExprMap);
-
-    Operator<?> funcOp = genUDTFPlan(call, fieldNames, output, rowResolver);
-
-    return new OpAttr(null, new HashSet<Integer>(), funcOp);
-  }
-
-  private Operator<?> genUDTFPlan(RexCall call, List<String> colAliases, Operator<?> input, RowResolver rowResolver)
-      throws SemanticException {
-    LOG.debug("genUDTFPlan, Col aliases: {}", colAliases);
-
-    GenericUDTF genericUDTF = createGenericUDTF(call);
-    StructObjectInspector rowOI = createStructObjectInspector(rowResolver, colAliases);
-    StructObjectInspector outputOI = genericUDTF.initialize(rowOI);
-    List<ColumnInfo> columnInfos = createColumnInfos(outputOI);
-
-    // Add the UDTFOperator to the operator DAG
-    return OperatorFactory.getAndMakeChild(new UDTFDesc(genericUDTF, false), new RowSchema(columnInfos), input);
-  }
-
-  private GenericUDTF createGenericUDTF(RexCall call) throws SemanticException {
-    String functionName = call.getOperator().getName();
-    FunctionInfo fi = FunctionRegistry.getFunctionInfo(functionName);
-    return fi.getGenericUDTF();
-  }
-
-  private StructObjectInspector createStructObjectInspector(RowResolver rowResolver, List<String> colAliases)
-      throws SemanticException {
-    // Create the object inspector for the input columns and initialize the UDTF
-    List<String> colNames = rowResolver.getColumnInfos().stream().map(ci -> ci.getInternalName())
-        .collect(Collectors.toList());
-    List<ObjectInspector> colOIs = rowResolver.getColumnInfos().stream().map(ci -> ci.getObjectInspector())
-        .collect(Collectors.toList());
-
-    return ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs);
-  }
-
-  private List<ColumnInfo> createColumnInfos(StructObjectInspector outputOI) {
-    // Generate the output column info's / row resolver using internal names.
-    List<ColumnInfo> columnInfos = new ArrayList<>();
-    for (StructField sf : outputOI.getAllStructFieldRefs()) {
-
-      // Since the UDTF operator feeds into a LVJ operator that will rename all the internal names, we can just use
-      // field name from the UDTF's OI as the internal name
-      ColumnInfo col = new ColumnInfo(sf.getFieldName(),
-          TypeInfoUtils.getTypeInfoFromObjectInspector(sf.getFieldObjectInspector()), null, false);
-      columnInfos.add(col);
-    }
-    return columnInfos;
-  }
-
-  /**
-   * TODO: 1. PPD needs to get pushed in to TS
-   *
-   * @param scanRel
-   * @return
-   */
-  OpAttr visit(HiveTableScan scanRel) {
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + scanRel.getRelTypeName()
-          + " with row type: [" + scanRel.getRowType() + "]");
-    }
-
-    RelOptHiveTable ht = (RelOptHiveTable) scanRel.getTable();
-
-    // 1. Setup TableScan Desc
-    // 1.1 Build col details used by scan
-    ArrayList<ColumnInfo> colInfos = new ArrayList<ColumnInfo>();
-    List<VirtualColumn> virtualCols = new ArrayList<VirtualColumn>();
-    List<Integer> neededColumnIDs = new ArrayList<Integer>();
-    List<String> neededColumnNames = new ArrayList<String>();
-    Set<Integer> vcolsInCalcite = new HashSet<Integer>();
-
-    List<String> partColNames = new ArrayList<String>();
-    Map<Integer, VirtualColumn> VColsMap = HiveCalciteUtil.getVColsMap(ht.getVirtualCols(),
-        ht.getNoOfNonVirtualCols());
-    Map<Integer, ColumnInfo> posToPartColInfo = ht.getPartColInfoMap();
-    Map<Integer, ColumnInfo> posToNonPartColInfo = ht.getNonPartColInfoMap();
-    List<Integer> neededColIndxsFrmReloptHT = scanRel.getNeededColIndxsFrmReloptHT();
-    List<String> scanColNames = scanRel.getRowType().getFieldNames();
-    String tableAlias = scanRel.getConcatQbIDAlias();
-
-    String colName;
-    ColumnInfo colInfo;
-    VirtualColumn vc;
-
-    for (int index = 0; index < scanRel.getRowType().getFieldList().size(); index++) {
-      colName = scanColNames.get(index);
-      if (VColsMap.containsKey(index)) {
-        vc = VColsMap.get(index);
-        virtualCols.add(vc);
-        colInfo = new ColumnInfo(vc.getName(), vc.getTypeInfo(), tableAlias, true, vc.getIsHidden());
-        vcolsInCalcite.add(index);
-      } else if (posToPartColInfo.containsKey(index)) {
-        partColNames.add(colName);
-        colInfo = posToPartColInfo.get(index);
-        vcolsInCalcite.add(index);
-      } else {
-        colInfo = posToNonPartColInfo.get(index);
-      }
-      colInfos.add(colInfo);
-      if (neededColIndxsFrmReloptHT.contains(index)) {
-        neededColumnIDs.add(index);
-        neededColumnNames.add(colName);
-      }
-    }
-
-    // 1.2 Create TableScanDesc
-    TableScanDesc tsd = new TableScanDesc(tableAlias, virtualCols, ht.getHiveTableMD());
-
-    // 1.3. Set Partition cols in TSDesc
-    tsd.setPartColumns(partColNames);
-
-    // 1.4. Set needed cols in TSDesc
-    tsd.setNeededColumnIDs(neededColumnIDs);
-    tsd.setNeededColumns(neededColumnNames);
-
-    // 2. Setup TableScan
-    TableScanOperator ts = (TableScanOperator) OperatorFactory.get(
-        semanticAnalyzer.getOpContext(), tsd, new RowSchema(colInfos));
-
-    //now that we let Calcite process subqueries we might have more than one
-    // tablescan with same alias.
-    if(topOps.get(tableAlias) != null) {
-      tableAlias = tableAlias +  this.uniqueCounter ;
-    }
-    topOps.put(tableAlias, ts);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated " + ts + " with row schema: [" + ts.getSchema() + "]");
-    }
-
-    return new OpAttr(tableAlias, vcolsInCalcite, ts);
-  }
-
-  OpAttr visit(HiveProject projectRel) throws SemanticException {
-    OpAttr inputOpAf = dispatch(projectRel.getInput());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + projectRel.getId() + ":"
-          + projectRel.getRelTypeName() + " with row type: [" + projectRel.getRowType() + "]");
-    }
-
-    WindowingSpec windowingSpec = new WindowingSpec();
-    List<String> exprNames = new ArrayList<String>(projectRel.getRowType().getFieldNames());
-    List<ExprNodeDesc> exprCols = new ArrayList<ExprNodeDesc>();
-    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-    for (int pos = 0; pos < projectRel.getChildExps().size(); pos++) {
-      ExprNodeConverter converter = new ExprNodeConverter(inputOpAf.tabAlias, projectRel
-          .getRowType().getFieldNames().get(pos), projectRel.getInput().getRowType(),
-          projectRel.getRowType(), inputOpAf.vcolsInCalcite, projectRel.getCluster().getTypeFactory(),
-          true);
-      ExprNodeDesc exprCol = projectRel.getChildExps().get(pos).accept(converter);
-      colExprMap.put(exprNames.get(pos), exprCol);
-      exprCols.add(exprCol);
-      //TODO: Cols that come through PTF should it retain (VirtualColumness)?
-      if (converter.getWindowFunctionSpec() != null) {
-        for (WindowFunctionSpec wfs : converter.getWindowFunctionSpec()) {
-          windowingSpec.addWindowFunction(wfs);
-        }
-      }
-    }
-    if (windowingSpec.getWindowExpressions() != null
-        && !windowingSpec.getWindowExpressions().isEmpty()) {
-      inputOpAf = genPTF(inputOpAf, windowingSpec);
-    }
-    // TODO: is this a safe assumption (name collision, external names...)
-    SelectDesc sd = new SelectDesc(exprCols, exprNames);
-    Pair<ArrayList<ColumnInfo>, Set<Integer>> colInfoVColPair = createColInfos(
-        projectRel.getChildExps(), exprCols, exprNames, inputOpAf);
-    SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(sd, new RowSchema(
-        colInfoVColPair.getKey()), inputOpAf.inputs.get(0));
-    selOp.setColumnExprMap(colExprMap);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated " + selOp + " with row schema: [" + selOp.getSchema() + "]");
-    }
-
-    return new OpAttr(inputOpAf.tabAlias, colInfoVColPair.getValue(), selOp);
-  }
-
-  OpAttr visit(HiveMultiJoin joinRel) throws SemanticException {
-    return translateJoin(joinRel);
-  }
-
-  OpAttr visit(HiveJoin joinRel) throws SemanticException {
-    return translateJoin(joinRel);
-  }
-
-  OpAttr visit(HiveSemiJoin joinRel) throws SemanticException {
-    return translateJoin(joinRel);
-  }
-
-  private String getHiveDerivedTableAlias() {
-    return "$hdt$_" + (this.uniqueCounter++);
-  }
-
-  private OpAttr translateJoin(RelNode joinRel) throws SemanticException {
-    // 0. Additional data structures needed for the join optimization
-    // through Hive
-    String[] baseSrc = new String[joinRel.getInputs().size()];
-    String tabAlias = getHiveDerivedTableAlias();
-
-    // 1. Convert inputs
-    OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()];
-    List<Operator<?>> children = new ArrayList<Operator<?>>(joinRel.getInputs().size());
-    for (int i = 0; i < inputs.length; i++) {
-      inputs[i] = dispatch(joinRel.getInput(i));
-      children.add(inputs[i].inputs.get(0));
-      baseSrc[i] = inputs[i].tabAlias;
-    }
-
-    // 2. Generate tags
-    for (int tag=0; tag<children.size(); tag++) {
-      ReduceSinkOperator reduceSinkOp = (ReduceSinkOperator) children.get(tag);
-      reduceSinkOp.getConf().setTag(tag);
-    }
-
-    // 3. Virtual columns
-    Set<Integer> newVcolsInCalcite = new HashSet<Integer>();
-    newVcolsInCalcite.addAll(inputs[0].vcolsInCalcite);
-    if (joinRel instanceof HiveMultiJoin ||
-            !((joinRel instanceof Join) && ((Join) joinRel).isSemiJoin())) {
-      int shift = inputs[0].inputs.get(0).getSchema().getSignature().size();
-      for (int i = 1; i < inputs.length; i++) {
-        newVcolsInCalcite.addAll(HiveCalciteUtil.shiftVColsSet(inputs[i].vcolsInCalcite, shift));
-        shift += inputs[i].inputs.get(0).getSchema().getSignature().size();
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + joinRel.getId() + ":" + joinRel.getRelTypeName()
-          + " with row type: [" + joinRel.getRowType() + "]");
-    }
-
-    // 4. Extract join key expressions from HiveSortExchange
-    ExprNodeDesc[][] joinExpressions = new ExprNodeDesc[inputs.length][];
-    for (int i = 0; i < inputs.length; i++) {
-      joinExpressions[i] = ((HiveSortExchange) joinRel.getInput(i)).getJoinExpressions();
-    }
-
-    // 5. Extract rest of join predicate info. We infer the rest of join condition
-    //    that will be added to the filters (join conditions that are not part of
-    //    the join key)
-    List<RexNode> joinFilters;
-    if (joinRel instanceof HiveJoin) {
-      joinFilters = ImmutableList.of(((HiveJoin)joinRel).getJoinFilter());
-    } else if (joinRel instanceof HiveMultiJoin){
-      joinFilters = ((HiveMultiJoin)joinRel).getJoinFilters();
-    } else if (joinRel instanceof HiveSemiJoin){
-      joinFilters = ImmutableList.of(((HiveSemiJoin)joinRel).getJoinFilter());
-    } else {
-      throw new SemanticException ("Can't handle join type: " + joinRel.getClass().getName());
-    }
-    List<List<ExprNodeDesc>> filterExpressions = Lists.newArrayList();
-    for (int i = 0; i< joinFilters.size(); i++) {
-      List<ExprNodeDesc> filterExpressionsForInput = new ArrayList<ExprNodeDesc>();
-      if (joinFilters.get(i) != null) {
-        for (RexNode conj : RelOptUtil.conjunctions(joinFilters.get(i))) {
-          ExprNodeDesc expr = convertToExprNode(conj, joinRel, null, newVcolsInCalcite);
-          filterExpressionsForInput.add(expr);
-        }
-      }
-      filterExpressions.add(filterExpressionsForInput);
-    }
-
-    // 6. Generate Join operator
-    JoinOperator joinOp = genJoin(joinRel, joinExpressions, filterExpressions, children,
-            baseSrc, tabAlias);
-
-    // 7. Return result
-    return new OpAttr(tabAlias, newVcolsInCalcite, joinOp);
-  }
-
-  OpAttr visit(HiveAggregate aggRel) throws SemanticException {
-    OpAttr inputOpAf = dispatch(aggRel.getInput());
-    return HiveGBOpConvUtil.translateGB(inputOpAf, aggRel, hiveConf);
-  }
-
-  OpAttr visit(HiveSortLimit sortRel) throws SemanticException {
-    OpAttr inputOpAf = dispatch(sortRel.getInput());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
-          + " with row type: [" + sortRel.getRowType() + "]");
-      if (sortRel.getCollation() == RelCollations.EMPTY) {
-        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
-            + " consists of limit");
-      } else if (sortRel.fetch == null) {
-        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
-            + " consists of sort");
-      } else {
-        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
-            + " consists of sort+limit");
-      }
-    }
-
-    Operator<?> inputOp = inputOpAf.inputs.get(0);
-    Operator<?> resultOp = inputOpAf.inputs.get(0);
-
-    // 1. If we need to sort tuples based on the value of some
-    // of their columns
-    if (sortRel.getCollation() != RelCollations.EMPTY) {
-
-      // In strict mode, in the presence of order by, limit must be specified.
-      if (sortRel.fetch == null) {
-        String error = StrictChecks.checkNoLimit(hiveConf);
-        if (error != null) throw new SemanticException(error);
-      }
-
-      // 1.a. Extract order for each column from collation
-      // Generate sortCols and order
-      ImmutableBitSet.Builder sortColsPosBuilder = ImmutableBitSet.builder();
-      ImmutableBitSet.Builder sortOutputColsPosBuilder = ImmutableBitSet.builder();
-      Map<Integer, RexNode> obRefToCallMap = sortRel.getInputRefToCallMap();
-      List<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
-      StringBuilder order = new StringBuilder();
-      StringBuilder nullOrder = new StringBuilder();
-      for (RelFieldCollation sortInfo : sortRel.getCollation().getFieldCollations()) {
-        int sortColumnPos = sortInfo.getFieldIndex();
-        ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature()
-            .get(sortColumnPos));
-        ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(),
-            columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol());
-        sortCols.add(sortColumn);
-        if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) {
-          order.append("-");
-        } else {
-          order.append("+");
-        }
-        if (sortInfo.nullDirection == RelFieldCollation.NullDirection.FIRST) {
-          nullOrder.append("a");
-        } else if (sortInfo.nullDirection == RelFieldCollation.NullDirection.LAST) {
-          nullOrder.append("z");
-        } else {
-          // Default
-          nullOrder.append(sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING ? "z" : "a");
-        }
-
-        if (obRefToCallMap != null) {
-          RexNode obExpr = obRefToCallMap.get(sortColumnPos);
-          sortColsPosBuilder.set(sortColumnPos);
-          if (obExpr == null) {
-            sortOutputColsPosBuilder.set(sortColumnPos);
-          }
-        }
-      }
-      // Use only 1 reducer for order by
-      int numReducers = 1;
-
-      // We keep the columns only the columns that are part of the final output
-      List<String> keepColumns = new ArrayList<String>();
-      final ImmutableBitSet sortColsPos = sortColsPosBuilder.build();
-      final ImmutableBitSet sortOutputColsPos = sortOutputColsPosBuilder.build();
-      final List<ColumnInfo> inputSchema = inputOp.getSchema().getSignature();
-      for (int pos=0; pos<inputSchema.size(); pos++) {
-        if ((sortColsPos.get(pos) && sortOutputColsPos.get(pos)) ||
-                (!sortColsPos.get(pos) && !sortOutputColsPos.get(pos))) {
-          keepColumns.add(inputSchema.get(pos).getInternalName());
-        }
-      }
-
-      // 1.b. Generate reduce sink and project operator
-      resultOp = genReduceSinkAndBacktrackSelect(resultOp,
-          sortCols.toArray(new ExprNodeDesc[sortCols.size()]), 0, new ArrayList<ExprNodeDesc>(),
-          order.toString(), nullOrder.toString(), numReducers, Operation.NOT_ACID, hiveConf, keepColumns);
-    }
-
-    // 2. If we need to generate limit
-    if (sortRel.fetch != null) {
-      int limit = RexLiteral.intValue(sortRel.fetch);
-      int offset = sortRel.offset == null ? 0 : RexLiteral.intValue(sortRel.offset);
-      LimitDesc limitDesc = new LimitDesc(offset,limit);
-      ArrayList<ColumnInfo> cinfoLst = createColInfos(resultOp);
-      resultOp = OperatorFactory.getAndMakeChild(limitDesc, new RowSchema(cinfoLst), resultOp);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]");
-      }
-    }
-
-    // 3. Return result
-    return inputOpAf.clone(resultOp);
-  }
-
-  /**
-   * TODO: 1) isSamplingPred 2) sampleDesc 3) isSortedFilter
-   */
-  OpAttr visit(HiveFilter filterRel) throws SemanticException {
-    OpAttr inputOpAf = dispatch(filterRel.getInput());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + filterRel.getId() + ":" + filterRel.getRelTypeName()
-          + " with row type: [" + filterRel.getRowType() + "]");
-    }
-
-    ExprNodeDesc filCondExpr = filterRel.getCondition().accept(
-        new ExprNodeConverter(inputOpAf.tabAlias, filterRel.getInput().getRowType(), inputOpAf.vcolsInCalcite,
-            filterRel.getCluster().getTypeFactory(), true));
-    FilterDesc filDesc = new FilterDesc(filCondExpr, false);
-    ArrayList<ColumnInfo> cinfoLst = createColInfos(inputOpAf.inputs.get(0));
-    FilterOperator filOp = (FilterOperator) OperatorFactory.getAndMakeChild(filDesc,
-        new RowSchema(cinfoLst), inputOpAf.inputs.get(0));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated " + filOp + " with row schema: [" + filOp.getSchema() + "]");
-    }
-
-    return inputOpAf.clone(filOp);
-  }
-
-  // use this function to make the union "flat" for both execution and explain
-  // purpose
-  private List<RelNode> extractRelNodeFromUnion(HiveUnion unionRel) {
-    List<RelNode> ret = new ArrayList<RelNode>();
-    for (RelNode input : unionRel.getInputs()) {
-      if (input instanceof HiveUnion) {
-        ret.addAll(extractRelNodeFromUnion((HiveUnion) input));
-      } else {
-        ret.add(input);
-      }
-    }
-    return ret;
-  }
-
-  OpAttr visit(HiveUnion unionRel) throws SemanticException {
-    // 1. Convert inputs
-    List<RelNode> inputsList = extractRelNodeFromUnion(unionRel);
-    OpAttr[] inputs = new OpAttr[inputsList.size()];
-    for (int i = 0; i < inputs.length; i++) {
-      inputs[i] = dispatch(inputsList.get(i));
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + unionRel.getId() + ":" + unionRel.getRelTypeName()
-          + " with row type: [" + unionRel.getRowType() + "]");
-    }
-
-    // 2. Create a new union operator
-    UnionDesc unionDesc = new UnionDesc();
-    unionDesc.setNumInputs(inputs.length);
-    String tableAlias = getHiveDerivedTableAlias();
-    ArrayList<ColumnInfo> cinfoLst = createColInfos(inputs[0].inputs.get(0), tableAlias);
-    Operator<?>[] children = new Operator<?>[inputs.length];
-    for (int i = 0; i < children.length; i++) {
-      if (i == 0) {
-        children[i] = inputs[i].inputs.get(0);
-      } else {
-        Operator<?> op = inputs[i].inputs.get(0);
-        // We need to check if the other input branches for union is following the first branch
-        // We may need to cast the data types for specific columns.
-        children[i] = genInputSelectForUnion(op, cinfoLst);
-      }
-    }
-    Operator<? extends OperatorDesc> unionOp = OperatorFactory.getAndMakeChild(
-        semanticAnalyzer.getOpContext(), unionDesc, new RowSchema(cinfoLst), children);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated " + unionOp + " with row schema: [" + unionOp.getSchema() + "]");
-    }
-
-    //TODO: Can columns retain virtualness out of union
-    // 3. Return result
-    return new OpAttr(tableAlias, inputs[0].vcolsInCalcite, unionOp);
-
-  }
-
-  OpAttr visit(HiveSortExchange exchangeRel) throws SemanticException {
-    OpAttr inputOpAf = dispatch(exchangeRel.getInput());
-    String tabAlias = inputOpAf.tabAlias;
-    if (tabAlias == null || tabAlias.length() == 0) {
-      tabAlias = getHiveDerivedTableAlias();
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translating operator rel#" + exchangeRel.getId() + ":"
-          + exchangeRel.getRelTypeName() + " with row type: [" + exchangeRel.getRowType() + "]");
-    }
-
-    RelDistribution distribution = exchangeRel.getDistribution();
-    if (distribution.getType() != Type.HASH_DISTRIBUTED) {
-      throw new SemanticException("Only hash distribution supported for LogicalExchange");
-    }
-    ExprNodeDesc[] expressions = new ExprNodeDesc[exchangeRel.getJoinKeys().size()];
-    for (int index = 0; index < exchangeRel.getJoinKeys().size(); index++) {
-      expressions[index] = convertToExprNode(exchangeRel.getJoinKeys().get(index),
-          exchangeRel.getInput(), inputOpAf.tabAlias, inputOpAf);
-    }
-    exchangeRel.setJoinExpressions(expressions);
-
-    ReduceSinkOperator rsOp = genReduceSink(inputOpAf.inputs.get(0), tabAlias, expressions,
-        -1, -1, Operation.NOT_ACID, hiveConf);
-
-    return new OpAttr(tabAlias, inputOpAf.vcolsInCalcite, rsOp);
-  }
-
-  private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticException {
-    Operator<?> input = inputOpAf.inputs.get(0);
-
-    wSpec.validateAndMakeEffective();
-    WindowingComponentizer groups = new WindowingComponentizer(wSpec);
-    RowResolver rr = new RowResolver();
-    for (ColumnInfo ci : input.getSchema().getSignature()) {
-      rr.put(inputOpAf.tabAlias, ci.getInternalName(), ci);
-    }
-
-    while (groups.hasNext()) {
-      wSpec = groups.next(hiveConf, semanticAnalyzer, unparseTranslator, rr);
-
-      // 1. Create RS and backtrack Select operator on top
-      ArrayList<ExprNodeDesc> keyCols = new ArrayList<ExprNodeDesc>();
-      ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
-      StringBuilder order = new StringBuilder();
-      StringBuilder nullOrder = new StringBuilder();
-
-      for (PartitionExpression partCol : wSpec.getQueryPartitionSpec().getExpressions()) {
-        ExprNodeDesc partExpr = semanticAnalyzer.genExprNodeDesc(partCol.getExpression(), rr);
-        if (ExprNodeDescUtils.indexOf(partExpr, partCols) < 0) {
-          keyCols.add(partExpr);
-          partCols.add(partExpr);
-          order.append('+');
-          nullOrder.append('a');
-        }
-      }
-
-      if (wSpec.getQueryOrderSpec() != null) {
-        for (OrderExpression orderCol : wSpec.getQueryOrderSpec().getExpressions()) {
-          ExprNodeDesc orderExpr = semanticAnalyzer.genExprNodeDesc(orderCol.getExpression(), rr);
-          char orderChar = orderCol.getOrder() == PTFInvocationSpec.Order.ASC ? '+' : '-';
-          char nullOrderChar = orderCol.getNullOrder() == PTFInvocationSpec.NullOrder.NULLS_FIRST ? 'a' : 'z';
-          int index = ExprNodeDescUtils.indexOf(orderExpr, keyCols);
-          if (index >= 0) {
-            order.setCharAt(index, orderChar);
-            nullOrder.setCharAt(index, nullOrderChar);
-            continue;
-          }
-          keyCols.add(orderExpr);
-          order.append(orderChar);
-          nullOrder.append(nullOrderChar);
-        }
-      }
-
-      SelectOperator selectOp = genReduceSinkAndBacktrackSelect(input,
-          keyCols.toArray(new ExprNodeDesc[keyCols.size()]), 0, partCols,
-          order.toString(), nullOrder.toString(), -1, Operation.NOT_ACID, hiveConf);
-
-      // 2. Finally create PTF
-      PTFTranslator translator = new PTFTranslator();
-      PTFDesc ptfDesc = translator.translate(wSpec, semanticAnalyzer, hiveConf, rr,
-          unparseTranslator);
-      RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
-
-      Operator<?> ptfOp = OperatorFactory.getAndMakeChild(
-          ptfDesc, new RowSchema(ptfOpRR.getColumnInfos()), selectOp);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Generated " + ptfOp + " with row schema: [" + ptfOp.getSchema() + "]");
-      }
-
-      // 3. Prepare for next iteration (if any)
-      rr = ptfOpRR;
-      input = ptfOp;
-    }
-
-    return inputOpAf.clone(input);
-  }
-
-  private static SelectOperator genReduceSinkAndBacktrackSelect(Operator<?> input,
-          ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols, String order,
-          String nullOrder, int numReducers, Operation acidOperation, HiveConf hiveConf)
-              throws SemanticException {
-    return genReduceSinkAndBacktrackSelect(input, keys, tag, partitionCols, order, nullOrder,
-        numReducers, acidOperation, hiveConf, input.getSchema().getColumnNames());
-  }
-
-  private static SelectOperator genReduceSinkAndBacktrackSelect(Operator<?> input,
-      ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols, String order, String nullOrder,
-      int numReducers, Operation acidOperation, HiveConf hiveConf,
-      List<String> keepColNames) throws SemanticException {
-    // 1. Generate RS operator
-    // 1.1 Prune the tableNames, only count the tableNames that are not empty strings
-  // as empty string in table aliases is only allowed for virtual columns.
-    String tableAlias = null;
-    Set<String> tableNames = input.getSchema().getTableNames();
-    for (String tableName : tableNames) {
-      if (tableName != null) {
-        if (tableName.length() == 0) {
-          if (tableAlias == null) {
-            tableAlias = tableName;
-          }
-        } else {
-          if (tableAlias == null || tableAlias.length() == 0) {
-            tableAlias = tableName;
-          } else {
-            if (!tableName.equals(tableAlias)) {
-              throw new SemanticException(
-                  "In CBO return path, genReduceSinkAndBacktrackSelect is expecting only one tableAlias but there is more than one");
-            }
-          }
-        }
-      }
-    }
-    if (tableAlias == null) {
-      throw new SemanticException(
-          "In CBO return path, genReduceSinkAndBacktrackSelect is expecting only one tableAlias but there is none");
-    }
-    // 1.2 Now generate RS operator
-    ReduceSinkOperator rsOp = genReduceSink(input, tableAlias, keys, tag, partitionCols, order,
-            nullOrder, numReducers, acidOperation, hiveConf);
-
-    // 2. Generate backtrack Select operator
-    Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSink(keepColNames,
-        rsOp.getConf().getOutputKeyColumnNames(), rsOp.getConf().getOutputValueColumnNames(),
-        rsOp.getValueIndex(), input);
-    SelectDesc selectDesc = new SelectDesc(new ArrayList<ExprNodeDesc>(descriptors.values()),
-        new ArrayList<String>(descriptors.keySet()));
-    ArrayList<ColumnInfo> cinfoLst = createColInfosSubset(input, keepColNames);
-    SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(
-        selectDesc, new RowSchema(cinfoLst), rsOp);
-    selectOp.setColumnExprMap(descriptors);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated " + selectOp + " with row schema: [" + selectOp.getSchema() + "]");
-    }
-
-    return selectOp;
-  }
-
-  private static ReduceSinkOperator genReduceSink(Operator<?> input, String tableAlias, ExprNodeDesc[] keys, int tag,
-      int numReducers, Operation acidOperation, HiveConf hiveConf) throws SemanticException {
-    return genReduceSink(input, tableAlias, keys, tag, new ArrayList<ExprNodeDesc>(), "", "", numReducers,
-        acidOperation, hiveConf);
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private static ReduceSinkOperator genReduceSink(Operator<?> input, String tableAlias, ExprNodeDesc[] keys, int tag,
-      ArrayList<ExprNodeDesc> partitionCols, String order, String nullOrder, int numReducers,
-      Operation acidOperation, HiveConf hiveConf) throws SemanticException {
-    Operator dummy = Operator.createDummy(); // dummy for backtracking
-    dummy.setParentOperators(Arrays.asList(input));
-
-    ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
-    ArrayList<ExprNodeDesc> reduceKeysBack = new ArrayList<ExprNodeDesc>();
-
-    // Compute join keys and store in reduceKeys
-    for (ExprNodeDesc key : keys) {
-      reduceKeys.add(key);
-      reduceKeysBack.add(ExprNodeDescUtils.backtrack(key, dummy, input));
-    }
-
-    // Walk over the input schema and copy in the output
-    ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
-    ArrayList<ExprNodeDesc> reduceValuesBack = new ArrayList<ExprNodeDesc>();
-    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-
-    List<ColumnInfo> inputColumns = input.getSchema().getSignature();
-    ArrayList<ColumnInfo> outputColumns = new ArrayList<ColumnInfo>();
-    List<String> outputColumnNames = new ArrayList<String>();
-    int[] index = new int[inputColumns.size()];
-    for (int i = 0; i < inputColumns.size(); i++) {
-      ColumnInfo colInfo = inputColumns.get(i);
-      String outputColName = colInfo.getInternalName();
-      ExprNodeColumnDesc expr = new ExprNodeColumnDesc(colInfo);
-
-      // backtrack can be null when input is script operator
-      ExprNodeDesc exprBack = ExprNodeDescUtils.backtrack(expr, dummy, input);
-      int kindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceKeysBack);
-      if (kindex >= 0) {
-        ColumnInfo newColInfo = new ColumnInfo(colInfo);
-        newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex);
-        newColInfo.setAlias(outputColName);
-        newColInfo.setTabAlias(tableAlias);
-        outputColumns.add(newColInfo);
-        index[i] = kindex;
-        continue;
-      }
-      int vindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceValuesBack);
-      if (vindex >= 0) {
-        index[i] = -vindex - 1;
-        continue;
-      }
-      index[i] = -reduceValues.size() - 1;
-
-      reduceValues.add(expr);
-      reduceValuesBack.add(exprBack);
-
-      ColumnInfo newColInfo = new ColumnInfo(colInfo);
-      newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName);
-      newColInfo.setAlias(outputColName);
-      newColInfo.setTabAlias(tableAlias);
-
-      outputColumns.add(newColInfo);
-      outputColumnNames.add(outputColName);
-    }
-    dummy.setParentOperators(null);
-
-    // Use only 1 reducer if no reduce keys
-    if (reduceKeys.size() == 0) {
-      numReducers = 1;
-
-      // Cartesian product is not supported in strict mode
-      String error = StrictChecks.checkCartesian(hiveConf);
-      if (error != null) throw new SemanticException(error);
-    }
-
-    ReduceSinkDesc rsDesc;
-    if (order.isEmpty()) {
-      rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag,
-          reduceKeys.size(), numReducers, acidOperation);
-    } else {
-      rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag,
-          partitionCols, order, nullOrder, numReducers, acidOperation);
-    }
-
-    ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
-        rsDesc, new RowSchema(outputColumns), input);
-
-    List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
-    for (int i = 0; i < keyColNames.size(); i++) {
-      colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i));
-    }
-    List<String> valColNames = rsDesc.getOutputValueColumnNames();
-    for (int i = 0; i < valColNames.size(); i++) {
-      colExprMap.put(Utilities.ReduceField.VALUE + "." + valColNames.get(i), reduceValues.get(i));
-    }
-
-    rsOp.setValueIndex(index);
-    rsOp.setColumnExprMap(colExprMap);
-    rsOp.setInputAliases(input.getSchema().getTableNames()
-        .toArray(new String[input.getSchema().getTableNames().size()]));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated " + rsOp + " with row schema: [" + rsOp.getSchema() + "]");
-    }
-
-    return rsOp;
-  }
-
-  private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressions,
-      List<List<ExprNodeDesc>> filterExpressions, List<Operator<?>> children,
-      String[] baseSrc, String tabAlias)
-          throws SemanticException {
-
-    // 1. Extract join type
-    JoinCondDesc[] joinCondns;
-    boolean semiJoin;
-    boolean noOuterJoin;
-    if (join instanceof HiveMultiJoin) {
-      HiveMultiJoin hmj = (HiveMultiJoin) join;
-      joinCondns = new JoinCondDesc[hmj.getJoinInputs().size()];
-      for (int i = 0; i < hmj.getJoinInputs().size(); i++) {
-        joinCondns[i] = new JoinCondDesc(new JoinCond(
-                hmj.getJoinInputs().get(i).left,
-                hmj.getJoinInputs().get(i).right,
-                transformJoinType(hmj.getJoinTypes().get(i))));
-      }
-      semiJoin = false;
-      noOuterJoin = !hmj.isOuterJoin();
-    } else {
-      joinCondns = new JoinCondDesc[1];
-      semiJoin = (join instanceof Join) && ((Join) join).isSemiJoin();
-      JoinType joinType;
-      if (semiJoin) {
-        joinType = JoinType.LEFTSEMI;
-      } else {
-        joinType = transformJoinType(((Join)join).getJoinType());
-      }
-      joinCondns[0] = new JoinCondDesc(new JoinCond(0, 1, joinType));
-      noOuterJoin = joinType != JoinType.FULLOUTER && joinType != JoinType.LEFTOUTER
-              && joinType != JoinType.RIGHTOUTER;
-    }
-
-    // 2. We create the join aux structures
-    ArrayList<ColumnInfo> outputColumns = new ArrayList<ColumnInfo>();
-    ArrayList<String> outputColumnNames = new ArrayList<String>(join.getRowType()
-        .getFieldNames());
-    Operator<?>[] childOps = new Operator[children.size()];
-
-    Map<String, Byte> reversedExprs = new HashMap<String, Byte>();
-    Map<Byte, List<ExprNodeDesc>> exprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-    Map<Byte, List<ExprNodeDesc>> filters = new HashMap<Byte, List<ExprNodeDesc>>();
-    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-    HashMap<Integer, Set<String>> posToAliasMap = new HashMap<Integer, Set<String>>();
-
-    int outputPos = 0;
-    for (int pos = 0; pos < children.size(); pos++) {
-      // 2.1. Backtracking from RS
-      ReduceSinkOperator inputRS = (ReduceSinkOperator) children.get(pos);
-      if (inputRS.getNumParent() != 1) {
-        throw new SemanticException("RS should have single parent");
-      }
-      Operator<?> parent = inputRS.getParentOperators().get(0);
-      ReduceSinkDesc rsDesc = inputRS.getConf();
-      int[] index = inputRS.getValueIndex();
-      Byte tag = (byte) rsDesc.getTag();
-
-      // 2.1.1. If semijoin...
-      if (semiJoin && pos != 0) {
-        exprMap.put(tag, new ArrayList<ExprNodeDesc>());
-        childOps[pos] = inputRS;
-        continue;
-      }
-
-      posToAliasMap.put(pos, new HashSet<String>(inputRS.getSchema().getTableNames()));
-      List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
-      List<String> valColNames = rsDesc.getOutputValueColumnNames();
-
-      Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSinkForJoin(outputPos,
-          outputColumnNames, keyColNames, valColNames, index, parent, baseSrc[pos]);
-
-      List<ColumnInfo> parentColumns = parent.getSchema().getSignature();
-      for (int i = 0; i < index.length; i++) {
-        ColumnInfo info = new ColumnInfo(parentColumns.get(i));
-        info.setInternalName(outputColumnNames.get(outputPos));
-        info.setTabAlias(tabAlias);
-        outputColumns.add(info);
-        reversedExprs.put(outputColumnNames.get(outputPos), tag);
-        outputPos++;
-      }
-
-      exprMap.put(tag, new ArrayList<ExprNodeDesc>(descriptors.values()));
-      colExprMap.putAll(descriptors);
-      childOps[pos] = inputRS;
-    }
-
-    // 3. We populate the filters and filterMap structure needed in the join descriptor
-    List<List<ExprNodeDesc>> filtersPerInput = Lists.newArrayList();
-    int[][] filterMap = new int[children.size()][];
-    for (int i=0; i<children.size(); i++) {
-      filtersPerInput.add(new ArrayList<ExprNodeDesc>());
-    }
-    // 3. We populate the filters structure
-    for (int i=0; i<filterExpressions.size(); i++) {
-      int leftPos = joinCondns[i].getLeft();
-      int rightPos = joinCondns[i].getRight();
-
-      for (ExprNodeDesc expr : filterExpressions.get(i)) {
-        // We need to update the exprNode, as currently
-        // they refer to columns in the output of the join;
-        // they should refer to the columns output by the RS
-        int inputPos = updateExprNode(expr, reversedExprs, colExprMap);
-        if (inputPos == -1) {
-          inputPos = leftPos;
-        }
-        filtersPerInput.get(inputPos).add(expr);
-
-        if (joinCondns[i].getType() == JoinDesc.FULL_OUTER_JOIN ||
-                joinCondns[i].getType() == JoinDesc.LEFT_OUTER_JOIN ||
-                joinCondns[i].getType() == JoinDesc.RIGHT_OUTER_JOIN) {
-          if (inputPos == leftPos) {
-            updateFilterMap(filterMap, leftPos, rightPos);
-          } else {
-            updateFilterMap(filterMap, rightPos, leftPos);
-          }
-        }
-      }
-    }
-    for (int pos = 0; pos < children.size(); pos++) {
-      ReduceSinkOperator inputRS = (ReduceSinkOperator) children.get(pos);
-      ReduceSinkDesc rsDesc = inputRS.getConf();
-      Byte tag = (byte) rsDesc.getTag();
-      filters.put(tag, filtersPerInput.get(pos));
-    }
-
-    // 4. We create the join operator with its descriptor
-    JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns,
-            filters, joinExpressions, null);
-    desc.setReversedExprs(reversedExprs);
-    desc.setFilterMap(filterMap);
-
-    JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(
-        childOps[0].getCompilationOpContext(), desc, new RowSchema(outputColumns), childOps);
-    joinOp.setColumnExprMap(colExprMap);
-    joinOp.setPosToAliasMap(posToAliasMap);
-    joinOp.getConf().setBaseSrc(baseSrc);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated " + joinOp + " with row schema: [" + joinOp.getSchema() + "]");
-    }
-
-    return joinOp;
-  }
-
-  /*
-   * This method updates the input expr, changing all the
-   * ExprNodeColumnDesc in it to refer to columns given by the
-   * colExprMap.
-   *
-   * For instance, "col_0 = 1" would become "VALUE.col_0 = 1";
-   * the execution engine expects filters in the Join operators
-   * to be expressed that way.
-   */
-  private static int updateExprNode(ExprNodeDesc expr, final Map<String, Byte> reversedExprs,
-      final Map<String, ExprNodeDesc> colExprMap) throws SemanticException {
-    int inputPos = -1;
-    if (expr instanceof ExprNodeGenericFuncDesc) {
-      ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) expr;
-      List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>();
-      for (ExprNodeDesc functionChild : func.getChildren()) {
-        if (functionChild instanceof ExprNodeColumnDesc) {
-          String colRef = functionChild.getExprString();
-          int pos = reversedExprs.get(colRef);
-          if (pos != -1) {
-            if (inputPos == -1) {
-              inputPos = pos;
-            } else if (inputPos != pos) {
-              throw new SemanticException(
-                  "UpdateExprNode is expecting only one position for join operator convert. But there are more than one.");
-            }
-          }
-          newChildren.add(colExprMap.get(colRef));
-        } else {
-          int pos = updateExprNode(functionChild, reversedExprs, colExprMap);
-          if (pos != -1) {
-            if (inputPos == -1) {
-              inputPos = pos;
-            } else if (inputPos != pos) {
-              throw new SemanticException(
-                  "UpdateExprNode is expecting only one position for join operator convert. But there are more than one.");
-            }
-          }
-          newChildren.add(functionChild);
-        }
-      }
-      func.setChildren(newChildren);
-    }
-    return inputPos;
-  }
-
-  private static void updateFilterMap(int[][] filterMap, int inputPos, int joinPos) {
-    int[] map = filterMap[inputPos];
-    if (map == null) {
-      filterMap[inputPos] = new int[2];
-      filterMap[inputPos][0] = joinPos;
-      filterMap[inputPos][1]++;
-    } else {
-      boolean inserted = false;
-      for (int j=0; j<map.length/2 && !inserted; j++) {
-        if (map[j*2] == joinPos) {
-          map[j*2+1]++;
-          inserted = true;
-        }
-      }
-      if (!inserted) {
-        int[] newMap = new int[map.length + 2];
-        System.arraycopy(map, 0, newMap, 0, map.length);
-        newMap[map.length] = joinPos;
-        newMap[map.length+1]++;
-        filterMap[inputPos] = newMap;
-      }
-    }
-  }
-
-  private static JoinType transformJoinType(JoinRelType type) {
-    JoinType resultJoinType;
-    switch (type) {
-    case FULL:
-      resultJoinType = JoinType.FULLOUTER;
-      break;
-    case LEFT:
-      resultJoinType = JoinType.LEFTOUTER;
-      break;
-    case RIGHT:
-      resultJoinType = JoinType.RIGHTOUTER;
-      break;
-    default:
-      resultJoinType = JoinType.INNER;
-      break;
-    }
-    return resultJoinType;
-  }
-
-  private static Map<String, ExprNodeDesc> buildBacktrackFromReduceSinkForJoin(int initialPos,
-      List<String> outputColumnNames, List<String> keyColNames, List<String> valueColNames,
-      int[] index, Operator<?> inputOp, String tabAlias) {
-    Map<String, ExprNodeDesc> columnDescriptors = new LinkedHashMap<String, ExprNodeDesc>();
-    for (int i = 0; i < index.length; i++) {
-      ColumnInfo info = new ColumnInfo(inputOp.getSchema().getSignature().get(i));
-      String field;
-      if (index[i] >= 0) {
-        field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
-      } else {
-        field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1);
-      }
-      ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), field, tabAlias,
-          info.getIsVirtualCol());
-      columnDescriptors.put(outputColumnNames.get(initialPos + i), desc);
-    }
-    return columnDescriptors;
-  }
-
-  private static Map<String, ExprNodeDesc> buildBacktrackFromReduceSink(List<String> keepColNames,
-      List<String> keyColNames, List<String> valueColNames, int[] index, Operator<?> inputOp) {
-    Map<String, ExprNodeDesc> columnDescriptors = new LinkedHashMap<String, ExprNodeDesc>();
-    int pos = 0;
-    for (int i = 0; i < index.length; i++) {
-      ColumnInfo info = inputOp.getSchema().getSignature().get(i);
-      if (pos < keepColNames.size() &&
-              info.getInternalName().equals(keepColNames.get(pos))) {
-        String field;
-        if (index[i] >= 0) {
-          field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
-        } else {
-          field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1);
-        }
-        ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), field, info.getTabAlias(),
-            info.getIsVirtualCol());
-        columnDescriptors.put(keepColNames.get(pos), desc);
-        pos++;
-      }
-    }
-    return columnDescriptors;
-  }
-
-  private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, String tabAlias, OpAttr inputAttr) {
-    return convertToExprNode(rn, inputRel, tabAlias, inputAttr.vcolsInCalcite);
-  }
-
-  private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, String tabAlias,
-          Set<Integer> vcolsInCalcite) {
-    return rn.accept(new ExprNodeConverter(tabAlias, inputRel.getRowType(), vcolsInCalcite,
-        inputRel.getCluster().getTypeFactory(), true));
-  }
-
-  private static ArrayList<ColumnInfo> createColInfos(Operator<?> input) {
-    ArrayList<ColumnInfo> cInfoLst = new ArrayList<ColumnInfo>();
-    for (ColumnInfo ci : input.getSchema().getSignature()) {
-      cInfoLst.add(new ColumnInfo(ci));
-    }
-    return cInfoLst;
-  }
-
-  //create column info with new tableAlias
-  private static ArrayList<ColumnInfo> createColInfos(Operator<?> input, String tableAlias) {
-    ArrayList<ColumnInfo> cInfoLst = new ArrayList<ColumnInfo>();
-    for (ColumnInfo ci : input.getSchema().getSignature()) {
-      ColumnInfo copyOfColumnInfo = new ColumnInfo(ci);
-      copyOfColumnInfo.setTabAlias(tableAlias);
-      cInfoLst.add(copyOfColumnInfo);
-    }
-    return cInfoLst;
-  }
-
-  private static ArrayList<ColumnInfo> createColInfosSubset(Operator<?> input,
-          List<String> keepColNames) {
-    ArrayList<ColumnInfo> cInfoLst = new ArrayList<ColumnInfo>();
-    int pos = 0;
-    for (ColumnInfo ci : input.getSchema().getSignature()) {
-      if (pos < keepColNames.size() &&
-              ci.getInternalName().equals(keepColNames.get(pos))) {
-        cInfoLst.add(new ColumnInfo(ci));
-        pos++;
-      }
-    }
-    return cInfoLst;
-  }
-
-  private static Pair<ArrayList<ColumnInfo>, Set<Integer>> createColInfos(
-      List<RexNode> calciteExprs, List<ExprNodeDesc> hiveExprs, List<String> projNames,
-      OpAttr inpOpAf) {
-    if (hiveExprs.size() != projNames.size()) {
-      throw new RuntimeException("Column expressions list doesn't match Column Names list");
-    }
-
-    RexNode rexN;
-    ExprNodeDesc pe;
-    ArrayList<ColumnInfo> colInfos = new ArrayList<ColumnInfo>();
-    boolean vc;
-    Set<Integer> newVColSet = new HashSet<Integer>();
-    for (int i = 0; i < hiveExprs.size(); i++) {
-      pe = hiveExprs.get(i);
-      rexN = calciteExprs.get(i);
-      vc = false;
-      if (rexN instanceof RexInputRef) {
-        if (inpOpAf.vcolsInCalcite.contains(((RexInputRef) rexN).getIndex())) {
-          newVColSet.add(i);
-          vc = true;
-        }
-      }
-      colInfos
-          .add(new ColumnInfo(projNames.get(i), pe.getTypeInfo(), inpOpAf.tabAlias, vc));
-    }
-
-    return new Pair<ArrayList<ColumnInfo>, Set<Integer>>(colInfos, newVColSet);
-  }
-
-  private Operator<? extends OperatorDesc> genInputSelectForUnion(
-      Operator<? extends OperatorDesc> origInputOp, ArrayList<ColumnInfo> uColumnInfo)
-      throws SemanticException {
-    Iterator<ColumnInfo> oIter = origInputOp.getSchema().getSignature().iterator();
-    Iterator<ColumnInfo> uIter = uColumnInfo.iterator();
-    List<ExprNodeDesc> columns = new ArrayList<ExprNodeDesc>();
-    List<String> colName = new ArrayList<String>();
-    Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
-    boolean needSelectOp = false;
-    while (oIter.hasNext()) {
-      ColumnInfo oInfo = oIter.next();
-      ColumnInfo uInfo = uIter.next();
-      if (!oInfo.isSameColumnForRR(uInfo)) {
-        needSelectOp = true;
-      }
-      ExprNodeDesc column = new ExprNodeColumnDesc(oInfo.getType(), oInfo.getInternalName(),
-          oInfo.getTabAlias(), oInfo.getIsVirtualCol(), oInfo.isSkewedCol());
-      if (!oInfo.getType().equals(uInfo.getType())) {
-        column = ParseUtils.createConversionCast(column, (PrimitiveTypeInfo) uInfo.getType());
-      }
-      columns.add(column);
-      colName.add(uInfo.getInternalName());
-      columnExprMap.put(uInfo.getInternalName(), column);
-    }
-    if (needSelectOp) {
-      return OperatorFactory.getAndMakeChild(new SelectDesc(
-          columns, colName), new RowSchema(uColumnInfo), columnExprMap, origInputOp);
-    } else {
-      return origInputOp;
-    }
-  }
-}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveAggregateVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveAggregateVisitor.java
new file mode 100644
index 0000000..d5a3812
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveAggregateVisitor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+class HiveAggregateVisitor extends HiveRelNodeVisitor<HiveAggregate> {
+  HiveAggregateVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  @Override
+  OpAttr visit(HiveAggregate aggRel) throws SemanticException {
+    OpAttr inputOpAf = hiveOpConverter.dispatch(aggRel.getInput());
+    return HiveGBOpConvUtil.translateGB(inputOpAf, aggRel, hiveOpConverter.getHiveConf());
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveFilterVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveFilterVisitor.java
new file mode 100644
index 0000000..b703eed
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveFilterVisitor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+
+class HiveFilterVisitor extends HiveRelNodeVisitor<HiveFilter> {
+  HiveFilterVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  /**
+   * TODO: 1) isSamplingPred 2) sampleDesc 3) isSortedFilter.
+   */
+  @Override
+  OpAttr visit(HiveFilter filterRel) throws SemanticException {
+    OpAttr inputOpAf = hiveOpConverter.dispatch(filterRel.getInput());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + filterRel.getId() + ":" + filterRel.getRelTypeName()
+          + " with row type: [" + filterRel.getRowType() + "]");
+    }
+
+    ExprNodeDesc filCondExpr = filterRel.getCondition().accept(
+        new ExprNodeConverter(inputOpAf.tabAlias, filterRel.getInput().getRowType(), inputOpAf.vcolsInCalcite,
+            filterRel.getCluster().getTypeFactory(), true));
+    FilterDesc filDesc = new FilterDesc(filCondExpr, false);
+    ArrayList<ColumnInfo> cinfoLst = HiveOpConverterUtils.createColInfos(inputOpAf.inputs.get(0));
+    FilterOperator filOp = (FilterOperator) OperatorFactory.getAndMakeChild(filDesc,
+        new RowSchema(cinfoLst), inputOpAf.inputs.get(0));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + filOp + " with row schema: [" + filOp.getSchema() + "]");
+    }
+
+    return inputOpAf.clone(filOp);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
similarity index 98%
rename from ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java
rename to ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
index 268aca6..46ddffd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveGBOpConvUtil.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hive.ql.optimizer.calcite.translator;
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -45,7 +45,8 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID;
-import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.GenericUDAFInfo;
@@ -74,7 +75,11 @@ import com.google.common.collect.ImmutableList;
  * 4. VirtualColMap needs to be maintained
  *
  */
-public class HiveGBOpConvUtil {
+final class HiveGBOpConvUtil {
+
+  private HiveGBOpConvUtil() {
+    throw new UnsupportedOperationException("HiveGBOpConvUtil should not be instantiated!");
+  }
 
   private static enum HIVEGBPHYSICALMODE {
     MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB,
@@ -91,7 +96,6 @@ public class HiveGBOpConvUtil {
     private final ArrayList<ExprNodeDesc> udafParams                      = new ArrayList<ExprNodeDesc>();
     private List<Integer>           udafParamsIndxInGBInfoDistExprs = new ArrayList<Integer>();
     // We store the position of the argument for the function in the input.
-    private List<Integer> argList;
   };
 
   private static class GBInfo {
@@ -237,7 +241,6 @@ public class HiveGBOpConvUtil {
           inputOpAf.tabAlias);
       udafAttrs.udafParams.addAll(argExps);
       udafAttrs.udafName = aggCall.getAggregation().getName();
-      udafAttrs.argList = aggCall.getArgList();
       udafAttrs.isDistinctUDAF = aggCall.isDistinct();
       List<Integer> argLst = new ArrayList<Integer>(aggCall.getArgList());
       List<Integer> distColIndicesOfUDAF = new ArrayList<Integer>();
@@ -246,7 +249,7 @@ public class HiveGBOpConvUtil {
         // NOTE: distinct expr can be part of of GB key
         if (udafAttrs.isDistinctUDAF) {
           ExprNodeDesc argExpr = argExps.get(i);
-          Integer found = ExprNodeDescUtils.indexOf(argExpr, gbInfo.gbKeys);
+          int found = ExprNodeDescUtils.indexOf(argExpr, gbInfo.gbKeys);
           distColIndicesOfUDAF.add(found < 0 ? distParamInRefsToOutputPos.get(argLst.get(i)) + gbInfo.gbKeys.size() +
               (gbInfo.grpSets.size() > 0 ? 1 : 0) : found);
           distUDAFParamsIndxInDistExprs.add(distParamInRefsToOutputPos.get(argLst.get(i)));
@@ -1022,8 +1025,7 @@ public class HiveGBOpConvUtil {
     // i.e., their positions can be mixed.
     // so for all UDAF we first check to see if it is groupby key, if not is it distinct key
     // if not it should be value
-    List<Integer> distinctPositions = new ArrayList<>();
-    Map<Integer, ArrayList<ExprNodeDesc>> indexToParameter = new TreeMap<>();
+    Map<Integer, List<ExprNodeDesc>> indexToParameter = new TreeMap<>();
     for (int i = 0; i < gbInfo.udafAttrs.size(); i++) {
       UDAFAttrs udafAttr = gbInfo.udafAttrs.get(i);
       ArrayList<ExprNodeDesc> aggParameters = new ArrayList<ExprNodeDesc>();
@@ -1055,15 +1057,15 @@ public class HiveGBOpConvUtil {
         numDistinctUDFs++;
       }
     }
-    for(int index : indexToParameter.keySet()){
-      UDAFAttrs udafAttr = gbInfo.udafAttrs.get(index);
+    for(Map.Entry<Integer, List<ExprNodeDesc>> e : indexToParameter.entrySet()){
+      UDAFAttrs udafAttr = gbInfo.udafAttrs.get(e.getKey());
       Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(gbMode, udafAttr.isDistinctUDAF);
       GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(udafAttr.udafEvaluator, udafMode,
-          indexToParameter.get(index));
+          e.getValue());
       aggregations.add(new AggregationDesc(udafAttr.udafName.toLowerCase(),
           udaf.genericUDAFEvaluator, udaf.convertedParameters, udafAttr.isDistinctUDAF, udafMode));
       if (useOriginalGBNames) {
-        colOutputName = gbInfo.outputColNames.get(udafColStartPosInOriginalGB + index);
+        colOutputName = gbInfo.outputColNames.get(udafColStartPosInOriginalGB + e.getKey());
       } else {
         colOutputName = SemanticAnalyzer.getColumnInternalName(gbKeys.size() + aggregations.size()
             - 1);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverter.java
new file mode 100644
index 0000000..bc2b742
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveMultiJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.UnparseTranslator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class HiveOpConverter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveOpConverter.class);
+
+  // TODO: remove this after stashing only rqd pieces from opconverter
+  private final SemanticAnalyzer               semanticAnalyzer;
+  private final HiveConf                       hiveConf;
+  private final UnparseTranslator              unparseTranslator;
+  private final Map<String, TableScanOperator> topOps;
+  private int                                  uniqueCounter;
+
+  public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf,
+      UnparseTranslator unparseTranslator, Map<String, TableScanOperator> topOps) {
+    this.semanticAnalyzer = semanticAnalyzer;
+    this.hiveConf = hiveConf;
+    this.unparseTranslator = unparseTranslator;
+    this.topOps = topOps;
+    this.uniqueCounter = 0;
+  }
+
+  static class OpAttr {
+    final String               tabAlias;
+    ImmutableList<Operator<?>> inputs;
+    ImmutableSet<Integer>      vcolsInCalcite;
+
+    OpAttr(String tabAlias, Set<Integer> vcols, Operator<?>... inputs) {
+      this.tabAlias = tabAlias;
+      this.inputs = ImmutableList.copyOf(inputs);
+      this.vcolsInCalcite = ImmutableSet.copyOf(vcols);
+    }
+
+    OpAttr clone(Operator<?>... inputs) {
+      return new OpAttr(tabAlias, vcolsInCalcite, inputs);
+    }
+  }
+
+  public Operator<?> convert(RelNode root) throws SemanticException {
+    OpAttr opAf = dispatch(root);
+    Operator<?>rootOp = opAf.inputs.get(0);
+    handleTopLimit(rootOp);
+    return rootOp;
+  }
+
+  OpAttr dispatch(RelNode rn) throws SemanticException {
+    if (rn instanceof HiveTableScan) {
+      return new HiveTableScanVisitor(this).visit((HiveTableScan) rn);
+    } else if (rn instanceof HiveProject) {
+      return new HiveProjectVisitor(this).visit((HiveProject) rn);
+    } else if (rn instanceof HiveMultiJoin) {
+      return new JoinVisitor(this).visit((HiveMultiJoin) rn);
+    } else if (rn instanceof HiveJoin) {
+      return new JoinVisitor(this).visit((HiveJoin) rn);
+    } else if (rn instanceof HiveSemiJoin) {
+      return new JoinVisitor(this).visit((HiveSemiJoin) rn);
+    } else if (rn instanceof HiveFilter) {
+      return new HiveFilterVisitor(this).visit((HiveFilter) rn);
+    } else if (rn instanceof HiveSortLimit) {
+      return new HiveSortLimitVisitor(this).visit((HiveSortLimit) rn);
+    } else if (rn instanceof HiveUnion) {
+      return new HiveUnionVisitor(this).visit((HiveUnion) rn);
+    } else if (rn instanceof HiveSortExchange) {
+      return new HiveSortExchangeVisitor(this).visit((HiveSortExchange) rn);
+    } else if (rn instanceof HiveAggregate) {
+      return new HiveAggregateVisitor(this).visit((HiveAggregate) rn);
+    } else if (rn instanceof HiveTableFunctionScan) {
+      return new HiveTableFunctionScanVisitor(this).visit((HiveTableFunctionScan) rn);
+    }
+    LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported yet in return path.");
+    return null;
+  }
+
+  private void handleTopLimit(Operator<?> rootOp) {
+    if (rootOp instanceof LimitOperator) {
+      // this can happen only on top most limit, not while visiting Limit Operator
+      // since that can be within subquery.
+      this.semanticAnalyzer.getQB().getParseInfo().setOuterQueryLimit(((LimitOperator) rootOp).getConf().getLimit());
+    }
+  }
+
+  String getHiveDerivedTableAlias() {
+    return "$hdt$_" + (this.uniqueCounter++);
+  }
+
+  SemanticAnalyzer getSemanticAnalyzer() {
+    return semanticAnalyzer;
+  }
+
+  HiveConf getHiveConf() {
+    return hiveConf;
+  }
+
+  UnparseTranslator getUnparseTranslator() {
+    return unparseTranslator;
+  }
+
+  Map<String, TableScanOperator> getTopOps() {
+    return topOps;
+  }
+
+  int getUniqueCounter() {
+    return uniqueCounter;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
new file mode 100644
index 0000000..9cc1712
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class HiveOpConverterUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveOpConverterUtils.class);
+
+  private HiveOpConverterUtils() {
+    throw new UnsupportedOperationException("HiveOpConverterUtils should not be instantiated!");
+  }
+
+  static SelectOperator genReduceSinkAndBacktrackSelect(Operator<?> input,
+      ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols, String order, String nullOrder,
+      int numReducers, Operation acidOperation, HiveConf hiveConf,
+      List<String> keepColNames) throws SemanticException {
+    // 1. Generate RS operator
+    // 1.1 Prune the tableNames, only count the tableNames that are not empty strings
+  // as empty string in table aliases is only allowed for virtual columns.
+    String tableAlias = null;
+    Set<String> tableNames = input.getSchema().getTableNames();
+    for (String tableName : tableNames) {
+      if (tableName != null) {
+        if (tableName.length() == 0) {
+          if (tableAlias == null) {
+            tableAlias = tableName;
+          }
+        } else {
+          if (tableAlias == null || tableAlias.length() == 0) {
+            tableAlias = tableName;
+          } else {
+            if (!tableName.equals(tableAlias)) {
+              throw new SemanticException("In CBO return path, genReduceSinkAndBacktrackSelect is expecting only " +
+                  "one tableAlias but there is more than one");
+            }
+          }
+        }
+      }
+    }
+    if (tableAlias == null) {
+      throw new SemanticException(
+          "In CBO return path, genReduceSinkAndBacktrackSelect is expecting only one tableAlias but there is none");
+    }
+    // 1.2 Now generate RS operator
+    ReduceSinkOperator rsOp = genReduceSink(input, tableAlias, keys, tag, partitionCols, order,
+            nullOrder, numReducers, acidOperation, hiveConf);
+
+    // 2. Generate backtrack Select operator
+    Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSink(keepColNames,
+        rsOp.getConf().getOutputKeyColumnNames(), rsOp.getConf().getOutputValueColumnNames(),
+        rsOp.getValueIndex(), input);
+    SelectDesc selectDesc = new SelectDesc(new ArrayList<ExprNodeDesc>(descriptors.values()),
+        new ArrayList<String>(descriptors.keySet()));
+    ArrayList<ColumnInfo> cinfoLst = createColInfosSubset(input, keepColNames);
+    SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(
+        selectDesc, new RowSchema(cinfoLst), rsOp);
+    selectOp.setColumnExprMap(descriptors);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + selectOp + " with row schema: [" + selectOp.getSchema() + "]");
+    }
+
+    return selectOp;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  static ReduceSinkOperator genReduceSink(Operator<?> input, String tableAlias, ExprNodeDesc[] keys, int tag,
+      ArrayList<ExprNodeDesc> partitionCols, String order, String nullOrder, int numReducers,
+      Operation acidOperation, HiveConf hiveConf) throws SemanticException {
+    Operator dummy = Operator.createDummy(); // dummy for backtracking
+    dummy.setParentOperators(Arrays.asList(input));
+
+    ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> reduceKeysBack = new ArrayList<ExprNodeDesc>();
+
+    // Compute join keys and store in reduceKeys
+    for (ExprNodeDesc key : keys) {
+      reduceKeys.add(key);
+      reduceKeysBack.add(ExprNodeDescUtils.backtrack(key, dummy, input));
+    }
+
+    // Walk over the input schema and copy in the output
+    ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> reduceValuesBack = new ArrayList<ExprNodeDesc>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+    List<ColumnInfo> inputColumns = input.getSchema().getSignature();
+    ArrayList<ColumnInfo> outputColumns = new ArrayList<ColumnInfo>();
+    List<String> outputColumnNames = new ArrayList<String>();
+    int[] index = new int[inputColumns.size()];
+    for (int i = 0; i < inputColumns.size(); i++) {
+      ColumnInfo colInfo = inputColumns.get(i);
+      String outputColName = colInfo.getInternalName();
+      ExprNodeColumnDesc expr = new ExprNodeColumnDesc(colInfo);
+
+      // backtrack can be null when input is script operator
+      ExprNodeDesc exprBack = ExprNodeDescUtils.backtrack(expr, dummy, input);
+      int kindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceKeysBack);
+      if (kindex >= 0) {
+        ColumnInfo newColInfo = new ColumnInfo(colInfo);
+        newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex);
+        newColInfo.setAlias(outputColName);
+        newColInfo.setTabAlias(tableAlias);
+        outputColumns.add(newColInfo);
+        index[i] = kindex;
+        continue;
+      }
+      int vindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceValuesBack);
+      if (vindex >= 0) {
+        index[i] = -vindex - 1;
+        continue;
+      }
+      index[i] = -reduceValues.size() - 1;
+
+      reduceValues.add(expr);
+      reduceValuesBack.add(exprBack);
+
+      ColumnInfo newColInfo = new ColumnInfo(colInfo);
+      newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName);
+      newColInfo.setAlias(outputColName);
+      newColInfo.setTabAlias(tableAlias);
+
+      outputColumns.add(newColInfo);
+      outputColumnNames.add(outputColName);
+    }
+    dummy.setParentOperators(null);
+
+    // Use only 1 reducer if no reduce keys
+    if (reduceKeys.size() == 0) {
+      numReducers = 1;
+
+      // Cartesian product is not supported in strict mode
+      String error = StrictChecks.checkCartesian(hiveConf);
+      if (error != null) {
+        throw new SemanticException(error);
+      }
+    }
+
+    ReduceSinkDesc rsDesc;
+    if (order.isEmpty()) {
+      rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag,
+          reduceKeys.size(), numReducers, acidOperation);
+    } else {
+      rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag,
+          partitionCols, order, nullOrder, numReducers, acidOperation);
+    }
+
+    ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+        rsDesc, new RowSchema(outputColumns), input);
+
+    List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
+    for (int i = 0; i < keyColNames.size(); i++) {
+      colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i));
+    }
+    List<String> valColNames = rsDesc.getOutputValueColumnNames();
+    for (int i = 0; i < valColNames.size(); i++) {
+      colExprMap.put(Utilities.ReduceField.VALUE + "." + valColNames.get(i), reduceValues.get(i));
+    }
+
+    rsOp.setValueIndex(index);
+    rsOp.setColumnExprMap(colExprMap);
+    rsOp.setInputAliases(input.getSchema().getTableNames()
+        .toArray(new String[input.getSchema().getTableNames().size()]));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + rsOp + " with row schema: [" + rsOp.getSchema() + "]");
+    }
+
+    return rsOp;
+  }
+
+  private static Map<String, ExprNodeDesc> buildBacktrackFromReduceSink(List<String> keepColNames,
+      List<String> keyColNames, List<String> valueColNames, int[] index, Operator<?> inputOp) {
+    Map<String, ExprNodeDesc> columnDescriptors = new LinkedHashMap<String, ExprNodeDesc>();
+    int pos = 0;
+    for (int i = 0; i < index.length; i++) {
+      ColumnInfo info = inputOp.getSchema().getSignature().get(i);
+      if (pos < keepColNames.size() &&
+              info.getInternalName().equals(keepColNames.get(pos))) {
+        String field;
+        if (index[i] >= 0) {
+          field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
+        } else {
+          field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1);
+        }
+        ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), field, info.getTabAlias(),
+            info.getIsVirtualCol());
+        columnDescriptors.put(keepColNames.get(pos), desc);
+        pos++;
+      }
+    }
+    return columnDescriptors;
+  }
+
+  static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, String tabAlias, Set<Integer> vcolsInCalcite) {
+    return rn.accept(new ExprNodeConverter(tabAlias, inputRel.getRowType(), vcolsInCalcite,
+        inputRel.getCluster().getTypeFactory(), true));
+  }
+
+  static ArrayList<ColumnInfo> createColInfos(Operator<?> input) {
+    ArrayList<ColumnInfo> cInfoLst = new ArrayList<ColumnInfo>();
+    for (ColumnInfo ci : input.getSchema().getSignature()) {
+      cInfoLst.add(new ColumnInfo(ci));
+    }
+    return cInfoLst;
+  }
+
+  private static ArrayList<ColumnInfo> createColInfosSubset(Operator<?> input,
+          List<String> keepColNames) {
+    ArrayList<ColumnInfo> cInfoLst = new ArrayList<ColumnInfo>();
+    int pos = 0;
+    for (ColumnInfo ci : input.getSchema().getSignature()) {
+      if (pos < keepColNames.size() &&
+              ci.getInternalName().equals(keepColNames.get(pos))) {
+        cInfoLst.add(new ColumnInfo(ci));
+        pos++;
+      }
+    }
+    return cInfoLst;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveProjectVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveProjectVisitor.java
new file mode 100644
index 0000000..6b133c5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveProjectVisitor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.WindowingComponentizer;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+class HiveProjectVisitor extends HiveRelNodeVisitor<HiveProject> {
+  HiveProjectVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  @Override
+  OpAttr visit(HiveProject projectRel) throws SemanticException {
+    OpAttr inputOpAf = hiveOpConverter.dispatch(projectRel.getInput());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + projectRel.getId() + ":"
+          + projectRel.getRelTypeName() + " with row type: [" + projectRel.getRowType() + "]");
+    }
+
+    WindowingSpec windowingSpec = new WindowingSpec();
+    List<String> exprNames = new ArrayList<String>(projectRel.getRowType().getFieldNames());
+    List<ExprNodeDesc> exprCols = new ArrayList<ExprNodeDesc>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    for (int pos = 0; pos < projectRel.getChildExps().size(); pos++) {
+      ExprNodeConverter converter = new ExprNodeConverter(inputOpAf.tabAlias, projectRel
+          .getRowType().getFieldNames().get(pos), projectRel.getInput().getRowType(),
+          projectRel.getRowType(), inputOpAf.vcolsInCalcite, projectRel.getCluster().getTypeFactory(),
+          true);
+      ExprNodeDesc exprCol = projectRel.getChildExps().get(pos).accept(converter);
+      colExprMap.put(exprNames.get(pos), exprCol);
+      exprCols.add(exprCol);
+      //TODO: Cols that come through PTF should it retain (VirtualColumness)?
+      if (converter.getWindowFunctionSpec() != null) {
+        for (WindowFunctionSpec wfs : converter.getWindowFunctionSpec()) {
+          windowingSpec.addWindowFunction(wfs);
+        }
+      }
+    }
+    if (windowingSpec.getWindowExpressions() != null
+        && !windowingSpec.getWindowExpressions().isEmpty()) {
+      inputOpAf = genPTF(inputOpAf, windowingSpec);
+    }
+    // TODO: is this a safe assumption (name collision, external names...)
+    SelectDesc sd = new SelectDesc(exprCols, exprNames);
+    Pair<ArrayList<ColumnInfo>, Set<Integer>> colInfoVColPair = createColInfos(projectRel.getChildExps(), exprCols,
+        exprNames, inputOpAf);
+    SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(sd, new RowSchema(
+        colInfoVColPair.getKey()), inputOpAf.inputs.get(0));
+    selOp.setColumnExprMap(colExprMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + selOp + " with row schema: [" + selOp.getSchema() + "]");
+    }
+
+    return new OpAttr(inputOpAf.tabAlias, colInfoVColPair.getValue(), selOp);
+  }
+
+  private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticException {
+    Operator<?> input = inputOpAf.inputs.get(0);
+
+    wSpec.validateAndMakeEffective();
+    WindowingComponentizer groups = new WindowingComponentizer(wSpec);
+    RowResolver rr = new RowResolver();
+    for (ColumnInfo ci : input.getSchema().getSignature()) {
+      rr.put(inputOpAf.tabAlias, ci.getInternalName(), ci);
+    }
+
+    while (groups.hasNext()) {
+      wSpec = groups.next(hiveOpConverter.getHiveConf(), hiveOpConverter.getSemanticAnalyzer(),
+          hiveOpConverter.getUnparseTranslator(), rr);
+
+      // 1. Create RS and backtrack Select operator on top
+      ArrayList<ExprNodeDesc> keyCols = new ArrayList<ExprNodeDesc>();
+      ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+      StringBuilder order = new StringBuilder();
+      StringBuilder nullOrder = new StringBuilder();
+
+      for (PartitionExpression partCol : wSpec.getQueryPartitionSpec().getExpressions()) {
+        ExprNodeDesc partExpr = hiveOpConverter.getSemanticAnalyzer().genExprNodeDesc(partCol.getExpression(), rr);
+        if (ExprNodeDescUtils.indexOf(partExpr, partCols) < 0) {
+          keyCols.add(partExpr);
+          partCols.add(partExpr);
+          order.append('+');
+          nullOrder.append('a');
+        }
+      }
+
+      if (wSpec.getQueryOrderSpec() != null) {
+        for (OrderExpression orderCol : wSpec.getQueryOrderSpec().getExpressions()) {
+          ExprNodeDesc orderExpr = hiveOpConverter.getSemanticAnalyzer().genExprNodeDesc(orderCol.getExpression(), rr);
+          char orderChar = orderCol.getOrder() == PTFInvocationSpec.Order.ASC ? '+' : '-';
+          char nullOrderChar = orderCol.getNullOrder() == PTFInvocationSpec.NullOrder.NULLS_FIRST ? 'a' : 'z';
+          int index = ExprNodeDescUtils.indexOf(orderExpr, keyCols);
+          if (index >= 0) {
+            order.setCharAt(index, orderChar);
+            nullOrder.setCharAt(index, nullOrderChar);
+            continue;
+          }
+          keyCols.add(orderExpr);
+          order.append(orderChar);
+          nullOrder.append(nullOrderChar);
+        }
+      }
+
+      SelectOperator selectOp = genReduceSinkAndBacktrackSelect(input,
+          keyCols.toArray(new ExprNodeDesc[keyCols.size()]), 0, partCols,
+          order.toString(), nullOrder.toString(), -1, Operation.NOT_ACID, hiveOpConverter.getHiveConf());
+
+      // 2. Finally create PTF
+      PTFTranslator translator = new PTFTranslator();
+      PTFDesc ptfDesc = translator.translate(wSpec, hiveOpConverter.getSemanticAnalyzer(),
+          hiveOpConverter.getHiveConf(), rr, hiveOpConverter.getUnparseTranslator());
+      RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+
+      Operator<?> ptfOp = OperatorFactory.getAndMakeChild(
+          ptfDesc, new RowSchema(ptfOpRR.getColumnInfos()), selectOp);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generated " + ptfOp + " with row schema: [" + ptfOp.getSchema() + "]");
+      }
+
+      // 3. Prepare for next iteration (if any)
+      rr = ptfOpRR;
+      input = ptfOp;
+    }
+
+    return inputOpAf.clone(input);
+  }
+
+  private static SelectOperator genReduceSinkAndBacktrackSelect(Operator<?> input,
+          ExprNodeDesc[] keys, int tag, ArrayList<ExprNodeDesc> partitionCols, String order,
+          String nullOrder, int numReducers, Operation acidOperation, HiveConf hiveConf)
+              throws SemanticException {
+    return HiveOpConverterUtils.genReduceSinkAndBacktrackSelect(input, keys, tag, partitionCols, order, nullOrder,
+        numReducers, acidOperation, hiveConf, input.getSchema().getColumnNames());
+  }
+
+  private Pair<ArrayList<ColumnInfo>, Set<Integer>> createColInfos(List<RexNode> calciteExprs,
+      List<ExprNodeDesc> hiveExprs, List<String> projNames, OpAttr inpOpAf) {
+    if (hiveExprs.size() != projNames.size()) {
+      throw new RuntimeException("Column expressions list doesn't match Column Names list");
+    }
+
+    RexNode rexN;
+    ExprNodeDesc pe;
+    ArrayList<ColumnInfo> colInfos = new ArrayList<ColumnInfo>();
+    boolean vc;
+    Set<Integer> newVColSet = new HashSet<Integer>();
+    for (int i = 0; i < hiveExprs.size(); i++) {
+      pe = hiveExprs.get(i);
+      rexN = calciteExprs.get(i);
+      vc = false;
+      if (rexN instanceof RexInputRef) {
+        if (inpOpAf.vcolsInCalcite.contains(((RexInputRef) rexN).getIndex())) {
+          newVColSet.add(i);
+          vc = true;
+        }
+      }
+      colInfos.add(new ColumnInfo(projNames.get(i), pe.getTypeInfo(), inpOpAf.tabAlias, vc));
+    }
+
+    return new Pair<ArrayList<ColumnInfo>, Set<Integer>>(colInfos, newVColSet);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveRelNodeVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveRelNodeVisitor.java
new file mode 100644
index 0000000..1417ffd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveRelNodeVisitor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract ancestor of all the visitors.
+ */
+abstract class HiveRelNodeVisitor<T extends RelNode> {
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveRelNodeVisitor.class);
+
+  protected final HiveOpConverter hiveOpConverter;
+
+  HiveRelNodeVisitor(HiveOpConverter hiveOpConverter) {
+    this.hiveOpConverter = hiveOpConverter;
+  }
+
+  abstract OpAttr visit(T hiveRelNode) throws SemanticException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveSortExchangeVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveSortExchangeVisitor.java
new file mode 100644
index 0000000..68227db
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveSortExchangeVisitor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+class HiveSortExchangeVisitor extends HiveRelNodeVisitor<HiveSortExchange> {
+  HiveSortExchangeVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  @Override
+  OpAttr visit(HiveSortExchange exchangeRel) throws SemanticException {
+    OpAttr inputOpAf = hiveOpConverter.dispatch(exchangeRel.getInput());
+    String tabAlias = inputOpAf.tabAlias;
+    if (tabAlias == null || tabAlias.length() == 0) {
+      tabAlias = hiveOpConverter.getHiveDerivedTableAlias();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + exchangeRel.getId() + ":"
+          + exchangeRel.getRelTypeName() + " with row type: [" + exchangeRel.getRowType() + "]");
+    }
+
+    RelDistribution distribution = exchangeRel.getDistribution();
+    if (distribution.getType() != Type.HASH_DISTRIBUTED) {
+      throw new SemanticException("Only hash distribution supported for LogicalExchange");
+    }
+    ExprNodeDesc[] expressions = new ExprNodeDesc[exchangeRel.getJoinKeys().size()];
+    for (int index = 0; index < exchangeRel.getJoinKeys().size(); index++) {
+      expressions[index] = HiveOpConverterUtils.convertToExprNode(exchangeRel.getJoinKeys().get(index),
+          exchangeRel.getInput(), inputOpAf.tabAlias, inputOpAf.vcolsInCalcite);
+    }
+    exchangeRel.setJoinExpressions(expressions);
+
+    ReduceSinkOperator rsOp = genReduceSink(inputOpAf.inputs.get(0), tabAlias, expressions,
+        -1, -1, Operation.NOT_ACID, hiveOpConverter.getHiveConf());
+
+    return new OpAttr(tabAlias, inputOpAf.vcolsInCalcite, rsOp);
+  }
+
+  private static ReduceSinkOperator genReduceSink(Operator<?> input, String tableAlias, ExprNodeDesc[] keys, int tag,
+      int numReducers, Operation acidOperation, HiveConf hiveConf) throws SemanticException {
+    return HiveOpConverterUtils.genReduceSink(input, tableAlias, keys, tag, new ArrayList<ExprNodeDesc>(), "", "",
+        numReducers, acidOperation, hiveConf);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveSortLimitVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveSortLimitVisitor.java
new file mode 100644
index 0000000..d30defd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveSortLimitVisitor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+
+class HiveSortLimitVisitor extends HiveRelNodeVisitor<HiveSortLimit> {
+  HiveSortLimitVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  @Override
+  OpAttr visit(HiveSortLimit sortRel) throws SemanticException {
+    OpAttr inputOpAf = hiveOpConverter.dispatch(sortRel.getInput());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+          + " with row type: [" + sortRel.getRowType() + "]");
+      if (sortRel.getCollation() == RelCollations.EMPTY) {
+        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+            + " consists of limit");
+      } else if (sortRel.fetch == null) {
+        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+            + " consists of sort");
+      } else {
+        LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+            + " consists of sort+limit");
+      }
+    }
+
+    Operator<?> inputOp = inputOpAf.inputs.get(0);
+    Operator<?> resultOp = inputOpAf.inputs.get(0);
+
+    // 1. If we need to sort tuples based on the value of some
+    // of their columns
+    if (sortRel.getCollation() != RelCollations.EMPTY) {
+
+      // In strict mode, in the presence of order by, limit must be specified.
+      if (sortRel.fetch == null) {
+        String error = StrictChecks.checkNoLimit(hiveOpConverter.getHiveConf());
+        if (error != null) throw new SemanticException(error);
+      }
+
+      // 1.a. Extract order for each column from collation
+      // Generate sortCols and order
+      ImmutableBitSet.Builder sortColsPosBuilder = ImmutableBitSet.builder();
+      ImmutableBitSet.Builder sortOutputColsPosBuilder = ImmutableBitSet.builder();
+      Map<Integer, RexNode> obRefToCallMap = sortRel.getInputRefToCallMap();
+      List<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
+      StringBuilder order = new StringBuilder();
+      StringBuilder nullOrder = new StringBuilder();
+      for (RelFieldCollation sortInfo : sortRel.getCollation().getFieldCollations()) {
+        int sortColumnPos = sortInfo.getFieldIndex();
+        ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature()
+            .get(sortColumnPos));
+        ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(),
+            columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol());
+        sortCols.add(sortColumn);
+        if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) {
+          order.append("-");
+        } else {
+          order.append("+");
+        }
+        if (sortInfo.nullDirection == RelFieldCollation.NullDirection.FIRST) {
+          nullOrder.append("a");
+        } else if (sortInfo.nullDirection == RelFieldCollation.NullDirection.LAST) {
+          nullOrder.append("z");
+        } else {
+          // Default
+          nullOrder.append(sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING ? "z" : "a");
+        }
+
+        if (obRefToCallMap != null) {
+          RexNode obExpr = obRefToCallMap.get(sortColumnPos);
+          sortColsPosBuilder.set(sortColumnPos);
+          if (obExpr == null) {
+            sortOutputColsPosBuilder.set(sortColumnPos);
+          }
+        }
+      }
+      // Use only 1 reducer for order by
+      int numReducers = 1;
+
+      // We keep the columns only the columns that are part of the final output
+      List<String> keepColumns = new ArrayList<String>();
+      final ImmutableBitSet sortColsPos = sortColsPosBuilder.build();
+      final ImmutableBitSet sortOutputColsPos = sortOutputColsPosBuilder.build();
+      final List<ColumnInfo> inputSchema = inputOp.getSchema().getSignature();
+      for (int pos=0; pos<inputSchema.size(); pos++) {
+        if ((sortColsPos.get(pos) && sortOutputColsPos.get(pos)) ||
+                (!sortColsPos.get(pos) && !sortOutputColsPos.get(pos))) {
+          keepColumns.add(inputSchema.get(pos).getInternalName());
+        }
+      }
+
+      // 1.b. Generate reduce sink and project operator
+      resultOp = HiveOpConverterUtils.genReduceSinkAndBacktrackSelect(resultOp, sortCols.toArray(
+          new ExprNodeDesc[sortCols.size()]), 0, new ArrayList<ExprNodeDesc>(), order.toString(), nullOrder.toString(),
+          numReducers, Operation.NOT_ACID, hiveOpConverter.getHiveConf(), keepColumns);
+    }
+
+    // 2. If we need to generate limit
+    if (sortRel.fetch != null) {
+      int limit = RexLiteral.intValue(sortRel.fetch);
+      int offset = sortRel.offset == null ? 0 : RexLiteral.intValue(sortRel.offset);
+      LimitDesc limitDesc = new LimitDesc(offset,limit);
+      ArrayList<ColumnInfo> cinfoLst = HiveOpConverterUtils.createColInfos(resultOp);
+      resultOp = OperatorFactory.getAndMakeChild(limitDesc, new RowSchema(cinfoLst), resultOp);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]");
+      }
+    }
+
+    // 3. Return result
+    return inputOpAf.clone(resultOp);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveTableFunctionScanVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveTableFunctionScanVisitor.java
new file mode 100644
index 0000000..55455f0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveTableFunctionScanVisitor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FunctionInfo;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.UDTFDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+class HiveTableFunctionScanVisitor extends HiveRelNodeVisitor<HiveTableFunctionScan> {
+  HiveTableFunctionScanVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  @Override
+  OpAttr visit(HiveTableFunctionScan scanRel) throws SemanticException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + scanRel.getId() + ":"
+          + scanRel.getRelTypeName() + " with row type: [" + scanRel.getRowType() + "]");
+    }
+
+    RexCall call = (RexCall)scanRel.getCall();
+
+    RowResolver rowResolver = new RowResolver();
+    List<String> fieldNames = new ArrayList<>(scanRel.getRowType().getFieldNames());
+    List<String> exprNames = new ArrayList<>(fieldNames);
+    List<ExprNodeDesc> exprCols = new ArrayList<>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<>();
+    for (int pos = 0; pos < call.getOperands().size(); pos++) {
+      ExprNodeConverter converter = new ExprNodeConverter(SemanticAnalyzer.DUMMY_TABLE, fieldNames.get(pos),
+          scanRel.getRowType(), scanRel.getRowType(), ((HiveTableScan)scanRel.getInput(0)).getPartOrVirtualCols(),
+          scanRel.getCluster().getTypeFactory(), true);
+      ExprNodeDesc exprCol = call.getOperands().get(pos).accept(converter);
+      colExprMap.put(exprNames.get(pos), exprCol);
+      exprCols.add(exprCol);
+
+      ColumnInfo columnInfo = new ColumnInfo(fieldNames.get(pos), exprCol.getWritableObjectInspector(), null, false);
+      rowResolver.put(columnInfo.getTabAlias(), columnInfo.getAlias(), columnInfo);
+    }
+
+    OpAttr inputOpAf = hiveOpConverter.dispatch(scanRel.getInputs().get(0));
+    TableScanOperator op = (TableScanOperator)inputOpAf.inputs.get(0);
+    op.getConf().setRowLimit(1);
+
+    Operator<?> output = OperatorFactory.getAndMakeChild(new SelectDesc(exprCols, fieldNames, false),
+        new RowSchema(rowResolver.getRowSchema()), op);
+    output.setColumnExprMap(colExprMap);
+
+    Operator<?> funcOp = genUDTFPlan(call, fieldNames, output, rowResolver);
+
+    return new OpAttr(null, new HashSet<Integer>(), funcOp);
+  }
+
+  private Operator<?> genUDTFPlan(RexCall call, List<String> colAliases, Operator<?> input, RowResolver rowResolver)
+      throws SemanticException {
+    LOG.debug("genUDTFPlan, Col aliases: {}", colAliases);
+
+    GenericUDTF genericUDTF = createGenericUDTF(call);
+    StructObjectInspector rowOI = createStructObjectInspector(rowResolver, colAliases);
+    StructObjectInspector outputOI = genericUDTF.initialize(rowOI);
+    List<ColumnInfo> columnInfos = createColumnInfos(outputOI);
+
+    // Add the UDTFOperator to the operator DAG
+    return OperatorFactory.getAndMakeChild(new UDTFDesc(genericUDTF, false), new RowSchema(columnInfos), input);
+  }
+
+  private GenericUDTF createGenericUDTF(RexCall call) throws SemanticException {
+    String functionName = call.getOperator().getName();
+    FunctionInfo fi = FunctionRegistry.getFunctionInfo(functionName);
+    return fi.getGenericUDTF();
+  }
+
+  private StructObjectInspector createStructObjectInspector(RowResolver rowResolver, List<String> colAliases)
+      throws SemanticException {
+    // Create the object inspector for the input columns and initialize the UDTF
+    List<String> colNames = rowResolver.getColumnInfos().stream().map(ci -> ci.getInternalName())
+        .collect(Collectors.toList());
+    List<ObjectInspector> colOIs = rowResolver.getColumnInfos().stream().map(ci -> ci.getObjectInspector())
+        .collect(Collectors.toList());
+
+    return ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs);
+  }
+
+  private List<ColumnInfo> createColumnInfos(StructObjectInspector outputOI) {
+    // Generate the output column info's / row resolver using internal names.
+    List<ColumnInfo> columnInfos = new ArrayList<>();
+    for (StructField sf : outputOI.getAllStructFieldRefs()) {
+
+      // Since the UDTF operator feeds into a LVJ operator that will rename all the internal names, we can just use
+      // field name from the UDTF's OI as the internal name
+      ColumnInfo col = new ColumnInfo(sf.getFieldName(),
+          TypeInfoUtils.getTypeInfoFromObjectInspector(sf.getFieldObjectInspector()), null, false);
+      columnInfos.add(col);
+    }
+    return columnInfos;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveTableScanVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveTableScanVisitor.java
new file mode 100644
index 0000000..14958aa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveTableScanVisitor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+class HiveTableScanVisitor extends HiveRelNodeVisitor<HiveTableScan> {
+  HiveTableScanVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  /**
+   * TODO: 1. PPD needs to get pushed in to TS.
+   */
+  @Override
+  OpAttr visit(HiveTableScan scanRel) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + scanRel.getRelTypeName()
+          + " with row type: [" + scanRel.getRowType() + "]");
+    }
+
+    RelOptHiveTable ht = (RelOptHiveTable) scanRel.getTable();
+
+    // 1. Setup TableScan Desc
+    // 1.1 Build col details used by scan
+    ArrayList<ColumnInfo> colInfos = new ArrayList<ColumnInfo>();
+    List<VirtualColumn> virtualCols = new ArrayList<VirtualColumn>();
+    List<Integer> neededColumnIDs = new ArrayList<Integer>();
+    List<String> neededColumnNames = new ArrayList<String>();
+    Set<Integer> vcolsInCalcite = new HashSet<Integer>();
+
+    List<String> partColNames = new ArrayList<String>();
+    Map<Integer, VirtualColumn> vColsMap = HiveCalciteUtil.getVColsMap(ht.getVirtualCols(),
+        ht.getNoOfNonVirtualCols());
+    Map<Integer, ColumnInfo> posToPartColInfo = ht.getPartColInfoMap();
+    Map<Integer, ColumnInfo> posToNonPartColInfo = ht.getNonPartColInfoMap();
+    List<Integer> neededColIndxsFrmReloptHT = scanRel.getNeededColIndxsFrmReloptHT();
+    List<String> scanColNames = scanRel.getRowType().getFieldNames();
+    String tableAlias = scanRel.getConcatQbIDAlias();
+
+    String colName;
+    ColumnInfo colInfo;
+    VirtualColumn vc;
+
+    for (int index = 0; index < scanRel.getRowType().getFieldList().size(); index++) {
+      colName = scanColNames.get(index);
+      if (vColsMap.containsKey(index)) {
+        vc = vColsMap.get(index);
+        virtualCols.add(vc);
+        colInfo = new ColumnInfo(vc.getName(), vc.getTypeInfo(), tableAlias, true, vc.getIsHidden());
+        vcolsInCalcite.add(index);
+      } else if (posToPartColInfo.containsKey(index)) {
+        partColNames.add(colName);
+        colInfo = posToPartColInfo.get(index);
+        vcolsInCalcite.add(index);
+      } else {
+        colInfo = posToNonPartColInfo.get(index);
+      }
+      colInfos.add(colInfo);
+      if (neededColIndxsFrmReloptHT.contains(index)) {
+        neededColumnIDs.add(index);
+        neededColumnNames.add(colName);
+      }
+    }
+
+    // 1.2 Create TableScanDesc
+    TableScanDesc tsd = new TableScanDesc(tableAlias, virtualCols, ht.getHiveTableMD());
+
+    // 1.3. Set Partition cols in TSDesc
+    tsd.setPartColumns(partColNames);
+
+    // 1.4. Set needed cols in TSDesc
+    tsd.setNeededColumnIDs(neededColumnIDs);
+    tsd.setNeededColumns(neededColumnNames);
+
+    // 2. Setup TableScan
+    TableScanOperator ts = (TableScanOperator) OperatorFactory.get(
+        hiveOpConverter.getSemanticAnalyzer().getOpContext(), tsd, new RowSchema(colInfos));
+
+    //now that we let Calcite process subqueries we might have more than one
+    // tablescan with same alias.
+    if (hiveOpConverter.getTopOps().get(tableAlias) != null) {
+      tableAlias = tableAlias + hiveOpConverter.getUniqueCounter();
+    }
+    hiveOpConverter.getTopOps().put(tableAlias, ts);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + ts + " with row schema: [" + ts.getSchema() + "]");
+    }
+
+    return new OpAttr(tableAlias, vcolsInCalcite, ts);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveUnionVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveUnionVisitor.java
new file mode 100644
index 0000000..9ad5eeb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveUnionVisitor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+class HiveUnionVisitor extends HiveRelNodeVisitor<HiveUnion> {
+  HiveUnionVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  @Override
+  OpAttr visit(HiveUnion unionRel) throws SemanticException {
+    // 1. Convert inputs
+    List<RelNode> inputsList = extractRelNodeFromUnion(unionRel);
+    OpAttr[] inputs = new OpAttr[inputsList.size()];
+    for (int i = 0; i < inputs.length; i++) {
+      inputs[i] = hiveOpConverter.dispatch(inputsList.get(i));
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + unionRel.getId() + ":" + unionRel.getRelTypeName()
+          + " with row type: [" + unionRel.getRowType() + "]");
+    }
+
+    // 2. Create a new union operator
+    UnionDesc unionDesc = new UnionDesc();
+    unionDesc.setNumInputs(inputs.length);
+    String tableAlias = hiveOpConverter.getHiveDerivedTableAlias();
+    ArrayList<ColumnInfo> cinfoLst = createColInfos(inputs[0].inputs.get(0), tableAlias);
+    Operator<?>[] children = new Operator<?>[inputs.length];
+    for (int i = 0; i < children.length; i++) {
+      if (i == 0) {
+        children[i] = inputs[i].inputs.get(0);
+      } else {
+        Operator<?> op = inputs[i].inputs.get(0);
+        // We need to check if the other input branches for union is following the first branch
+        // We may need to cast the data types for specific columns.
+        children[i] = genInputSelectForUnion(op, cinfoLst);
+      }
+    }
+    Operator<? extends OperatorDesc> unionOp = OperatorFactory.getAndMakeChild(
+        hiveOpConverter.getSemanticAnalyzer().getOpContext(), unionDesc, new RowSchema(cinfoLst), children);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + unionOp + " with row schema: [" + unionOp.getSchema() + "]");
+    }
+
+    //TODO: Can columns retain virtualness out of union
+    // 3. Return result
+    return new OpAttr(tableAlias, inputs[0].vcolsInCalcite, unionOp);
+  }
+
+  // use this function to make the union "flat" for both execution and explain purpose
+  private List<RelNode> extractRelNodeFromUnion(HiveUnion unionRel) {
+    List<RelNode> ret = new ArrayList<RelNode>();
+    for (RelNode input : unionRel.getInputs()) {
+      if (input instanceof HiveUnion) {
+        ret.addAll(extractRelNodeFromUnion((HiveUnion) input));
+      } else {
+        ret.add(input);
+      }
+    }
+    return ret;
+  }
+
+  //create column info with new tableAlias
+  private ArrayList<ColumnInfo> createColInfos(Operator<?> input, String tableAlias) {
+    ArrayList<ColumnInfo> cInfoLst = new ArrayList<ColumnInfo>();
+    for (ColumnInfo ci : input.getSchema().getSignature()) {
+      ColumnInfo copyOfColumnInfo = new ColumnInfo(ci);
+      copyOfColumnInfo.setTabAlias(tableAlias);
+      cInfoLst.add(copyOfColumnInfo);
+    }
+    return cInfoLst;
+  }
+
+  private Operator<? extends OperatorDesc> genInputSelectForUnion(Operator<? extends OperatorDesc> origInputOp,
+      ArrayList<ColumnInfo> uColumnInfo) throws SemanticException {
+    Iterator<ColumnInfo> oIter = origInputOp.getSchema().getSignature().iterator();
+    Iterator<ColumnInfo> uIter = uColumnInfo.iterator();
+    List<ExprNodeDesc> columns = new ArrayList<ExprNodeDesc>();
+    List<String> colName = new ArrayList<String>();
+    Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
+    boolean needSelectOp = false;
+    while (oIter.hasNext()) {
+      ColumnInfo oInfo = oIter.next();
+      ColumnInfo uInfo = uIter.next();
+      if (!oInfo.isSameColumnForRR(uInfo)) {
+        needSelectOp = true;
+      }
+      ExprNodeDesc column = new ExprNodeColumnDesc(oInfo.getType(), oInfo.getInternalName(),
+          oInfo.getTabAlias(), oInfo.getIsVirtualCol(), oInfo.isSkewedCol());
+      if (!oInfo.getType().equals(uInfo.getType())) {
+        column = ParseUtils.createConversionCast(column, (PrimitiveTypeInfo) uInfo.getType());
+      }
+      columns.add(column);
+      colName.add(uInfo.getInternalName());
+      columnExprMap.put(uInfo.getInternalName(), column);
+    }
+    if (needSelectOp) {
+      return OperatorFactory.getAndMakeChild(new SelectDesc(
+          columns, colName), new RowSchema(uColumnInfo), columnExprMap, origInputOp);
+    } else {
+      return origInputOp;
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/JoinVisitor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/JoinVisitor.java
new file mode 100644
index 0000000..0286d54
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/JoinVisitor.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveMultiJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
+import org.apache.hadoop.hive.ql.parse.JoinCond;
+import org.apache.hadoop.hive.ql.parse.JoinType;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+class JoinVisitor extends HiveRelNodeVisitor<RelNode> {
+  JoinVisitor(HiveOpConverter hiveOpConverter) {
+    super(hiveOpConverter);
+  }
+
+  @Override
+  OpAttr visit(RelNode joinRel) throws SemanticException {
+    // 0. Additional data structures needed for the join optimization
+    // through Hive
+    String[] baseSrc = new String[joinRel.getInputs().size()];
+    String tabAlias = hiveOpConverter.getHiveDerivedTableAlias();
+
+    // 1. Convert inputs
+    OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()];
+    List<Operator<?>> children = new ArrayList<Operator<?>>(joinRel.getInputs().size());
+    for (int i = 0; i < inputs.length; i++) {
+      inputs[i] = hiveOpConverter.dispatch(joinRel.getInput(i));
+      children.add(inputs[i].inputs.get(0));
+      baseSrc[i] = inputs[i].tabAlias;
+    }
+
+    // 2. Generate tags
+    for (int tag=0; tag<children.size(); tag++) {
+      ReduceSinkOperator reduceSinkOp = (ReduceSinkOperator) children.get(tag);
+      reduceSinkOp.getConf().setTag(tag);
+    }
+
+    // 3. Virtual columns
+    Set<Integer> newVcolsInCalcite = new HashSet<Integer>();
+    newVcolsInCalcite.addAll(inputs[0].vcolsInCalcite);
+    if (joinRel instanceof HiveMultiJoin || !((joinRel instanceof Join) && ((Join) joinRel).isSemiJoin())) {
+      int shift = inputs[0].inputs.get(0).getSchema().getSignature().size();
+      for (int i = 1; i < inputs.length; i++) {
+        newVcolsInCalcite.addAll(HiveCalciteUtil.shiftVColsSet(inputs[i].vcolsInCalcite, shift));
+        shift += inputs[i].inputs.get(0).getSchema().getSignature().size();
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translating operator rel#" + joinRel.getId() + ":" + joinRel.getRelTypeName()
+          + " with row type: [" + joinRel.getRowType() + "]");
+    }
+
+    // 4. Extract join key expressions from HiveSortExchange
+    ExprNodeDesc[][] joinExpressions = new ExprNodeDesc[inputs.length][];
+    for (int i = 0; i < inputs.length; i++) {
+      joinExpressions[i] = ((HiveSortExchange) joinRel.getInput(i)).getJoinExpressions();
+    }
+
+    // 5. Extract rest of join predicate info. We infer the rest of join condition
+    //    that will be added to the filters (join conditions that are not part of
+    //    the join key)
+    List<RexNode> joinFilters;
+    if (joinRel instanceof HiveJoin) {
+      joinFilters = ImmutableList.of(((HiveJoin)joinRel).getJoinFilter());
+    } else if (joinRel instanceof HiveMultiJoin){
+      joinFilters = ((HiveMultiJoin)joinRel).getJoinFilters();
+    } else if (joinRel instanceof HiveSemiJoin){
+      joinFilters = ImmutableList.of(((HiveSemiJoin)joinRel).getJoinFilter());
+    } else {
+      throw new SemanticException("Can't handle join type: " + joinRel.getClass().getName());
+    }
+    List<List<ExprNodeDesc>> filterExpressions = Lists.newArrayList();
+    for (int i = 0; i< joinFilters.size(); i++) {
+      List<ExprNodeDesc> filterExpressionsForInput = new ArrayList<ExprNodeDesc>();
+      if (joinFilters.get(i) != null) {
+        for (RexNode conj : RelOptUtil.conjunctions(joinFilters.get(i))) {
+          ExprNodeDesc expr = HiveOpConverterUtils.convertToExprNode(conj, joinRel, null, newVcolsInCalcite);
+          filterExpressionsForInput.add(expr);
+        }
+      }
+      filterExpressions.add(filterExpressionsForInput);
+    }
+
+    // 6. Generate Join operator
+    JoinOperator joinOp = genJoin(joinRel, joinExpressions, filterExpressions, children, baseSrc, tabAlias);
+
+    // 7. Return result
+    return new OpAttr(tabAlias, newVcolsInCalcite, joinOp);
+  }
+
+  private JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressions,
+      List<List<ExprNodeDesc>> filterExpressions, List<Operator<?>> children, String[] baseSrc, String tabAlias)
+          throws SemanticException {
+
+    // 1. Extract join type
+    JoinCondDesc[] joinCondns;
+    boolean semiJoin;
+    boolean noOuterJoin;
+    if (join instanceof HiveMultiJoin) {
+      HiveMultiJoin hmj = (HiveMultiJoin) join;
+      joinCondns = new JoinCondDesc[hmj.getJoinInputs().size()];
+      for (int i = 0; i < hmj.getJoinInputs().size(); i++) {
+        joinCondns[i] = new JoinCondDesc(new JoinCond(
+                hmj.getJoinInputs().get(i).left,
+                hmj.getJoinInputs().get(i).right,
+                transformJoinType(hmj.getJoinTypes().get(i))));
+      }
+      semiJoin = false;
+      noOuterJoin = !hmj.isOuterJoin();
+    } else {
+      joinCondns = new JoinCondDesc[1];
+      semiJoin = (join instanceof Join) && ((Join) join).isSemiJoin();
+      JoinType joinType;
+      if (semiJoin) {
+        joinType = JoinType.LEFTSEMI;
+      } else {
+        joinType = transformJoinType(((Join)join).getJoinType());
+      }
+      joinCondns[0] = new JoinCondDesc(new JoinCond(0, 1, joinType));
+      noOuterJoin = joinType != JoinType.FULLOUTER && joinType != JoinType.LEFTOUTER
+              && joinType != JoinType.RIGHTOUTER;
+    }
+
+    // 2. We create the join aux structures
+    ArrayList<ColumnInfo> outputColumns = new ArrayList<ColumnInfo>();
+    ArrayList<String> outputColumnNames = new ArrayList<String>(join.getRowType()
+        .getFieldNames());
+    Operator<?>[] childOps = new Operator[children.size()];
+
+    Map<String, Byte> reversedExprs = new HashMap<String, Byte>();
+    Map<Byte, List<ExprNodeDesc>> exprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+    Map<Byte, List<ExprNodeDesc>> filters = new HashMap<Byte, List<ExprNodeDesc>>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    HashMap<Integer, Set<String>> posToAliasMap = new HashMap<Integer, Set<String>>();
+
+    int outputPos = 0;
+    for (int pos = 0; pos < children.size(); pos++) {
+      // 2.1. Backtracking from RS
+      ReduceSinkOperator inputRS = (ReduceSinkOperator) children.get(pos);
+      if (inputRS.getNumParent() != 1) {
+        throw new SemanticException("RS should have single parent");
+      }
+      Operator<?> parent = inputRS.getParentOperators().get(0);
+      ReduceSinkDesc rsDesc = inputRS.getConf();
+      int[] index = inputRS.getValueIndex();
+      Byte tag = (byte) rsDesc.getTag();
+
+      // 2.1.1. If semijoin...
+      if (semiJoin && pos != 0) {
+        exprMap.put(tag, new ArrayList<ExprNodeDesc>());
+        childOps[pos] = inputRS;
+        continue;
+      }
+
+      posToAliasMap.put(pos, new HashSet<String>(inputRS.getSchema().getTableNames()));
+      List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
+      List<String> valColNames = rsDesc.getOutputValueColumnNames();
+
+      Map<String, ExprNodeDesc> descriptors = buildBacktrackFromReduceSinkForJoin(outputPos, outputColumnNames,
+          keyColNames, valColNames, index, parent, baseSrc[pos]);
+
+      List<ColumnInfo> parentColumns = parent.getSchema().getSignature();
+      for (int i = 0; i < index.length; i++) {
+        ColumnInfo info = new ColumnInfo(parentColumns.get(i));
+        info.setInternalName(outputColumnNames.get(outputPos));
+        info.setTabAlias(tabAlias);
+        outputColumns.add(info);
+        reversedExprs.put(outputColumnNames.get(outputPos), tag);
+        outputPos++;
+      }
+
+      exprMap.put(tag, new ArrayList<ExprNodeDesc>(descriptors.values()));
+      colExprMap.putAll(descriptors);
+      childOps[pos] = inputRS;
+    }
+
+    // 3. We populate the filters and filterMap structure needed in the join descriptor
+    List<List<ExprNodeDesc>> filtersPerInput = Lists.newArrayList();
+    int[][] filterMap = new int[children.size()][];
+    for (int i=0; i<children.size(); i++) {
+      filtersPerInput.add(new ArrayList<ExprNodeDesc>());
+    }
+    // 3. We populate the filters structure
+    for (int i=0; i<filterExpressions.size(); i++) {
+      int leftPos = joinCondns[i].getLeft();
+      int rightPos = joinCondns[i].getRight();
+
+      for (ExprNodeDesc expr : filterExpressions.get(i)) {
+        // We need to update the exprNode, as currently
+        // they refer to columns in the output of the join;
+        // they should refer to the columns output by the RS
+        int inputPos = updateExprNode(expr, reversedExprs, colExprMap);
+        if (inputPos == -1) {
+          inputPos = leftPos;
+        }
+        filtersPerInput.get(inputPos).add(expr);
+
+        if (joinCondns[i].getType() == JoinDesc.FULL_OUTER_JOIN ||
+                joinCondns[i].getType() == JoinDesc.LEFT_OUTER_JOIN ||
+                joinCondns[i].getType() == JoinDesc.RIGHT_OUTER_JOIN) {
+          if (inputPos == leftPos) {
+            updateFilterMap(filterMap, leftPos, rightPos);
+          } else {
+            updateFilterMap(filterMap, rightPos, leftPos);
+          }
+        }
+      }
+    }
+    for (int pos = 0; pos < children.size(); pos++) {
+      ReduceSinkOperator inputRS = (ReduceSinkOperator) children.get(pos);
+      ReduceSinkDesc rsDesc = inputRS.getConf();
+      Byte tag = (byte) rsDesc.getTag();
+      filters.put(tag, filtersPerInput.get(pos));
+    }
+
+    // 4. We create the join operator with its descriptor
+    JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns, filters, joinExpressions, null);
+    desc.setReversedExprs(reversedExprs);
+    desc.setFilterMap(filterMap);
+
+    JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(
+        childOps[0].getCompilationOpContext(), desc, new RowSchema(outputColumns), childOps);
+    joinOp.setColumnExprMap(colExprMap);
+    joinOp.setPosToAliasMap(posToAliasMap);
+    joinOp.getConf().setBaseSrc(baseSrc);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generated " + joinOp + " with row schema: [" + joinOp.getSchema() + "]");
+    }
+
+    return joinOp;
+  }
+
+  private Map<String, ExprNodeDesc> buildBacktrackFromReduceSinkForJoin(int initialPos, List<String> outputColumnNames,
+      List<String> keyColNames, List<String> valueColNames, int[] index, Operator<?> inputOp, String tabAlias) {
+    Map<String, ExprNodeDesc> columnDescriptors = new LinkedHashMap<String, ExprNodeDesc>();
+    for (int i = 0; i < index.length; i++) {
+      ColumnInfo info = new ColumnInfo(inputOp.getSchema().getSignature().get(i));
+      String field;
+      if (index[i] >= 0) {
+        field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
+      } else {
+        field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1);
+      }
+      ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), field, tabAlias,
+          info.getIsVirtualCol());
+      columnDescriptors.put(outputColumnNames.get(initialPos + i), desc);
+    }
+    return columnDescriptors;
+  }
+
+  /*
+   * This method updates the input expr, changing all the
+   * ExprNodeColumnDesc in it to refer to columns given by the
+   * colExprMap.
+   *
+   * For instance, "col_0 = 1" would become "VALUE.col_0 = 1";
+   * the execution engine expects filters in the Join operators
+   * to be expressed that way.
+   */
+  private int updateExprNode(ExprNodeDesc expr, final Map<String, Byte> reversedExprs,
+      final Map<String, ExprNodeDesc> colExprMap) throws SemanticException {
+    int inputPos = -1;
+    if (expr instanceof ExprNodeGenericFuncDesc) {
+      ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) expr;
+      List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>();
+      for (ExprNodeDesc functionChild : func.getChildren()) {
+        if (functionChild instanceof ExprNodeColumnDesc) {
+          String colRef = functionChild.getExprString();
+          int pos = reversedExprs.get(colRef);
+          if (pos != -1) {
+            if (inputPos == -1) {
+              inputPos = pos;
+            } else if (inputPos != pos) {
+              throw new SemanticException(
+                  "UpdateExprNode is expecting only one position for join operator convert. But there are more than one.");
+            }
+          }
+          newChildren.add(colExprMap.get(colRef));
+        } else {
+          int pos = updateExprNode(functionChild, reversedExprs, colExprMap);
+          if (pos != -1) {
+            if (inputPos == -1) {
+              inputPos = pos;
+            } else if (inputPos != pos) {
+              throw new SemanticException(
+                  "UpdateExprNode is expecting only one position for join operator convert. But there are more than one.");
+            }
+          }
+          newChildren.add(functionChild);
+        }
+      }
+      func.setChildren(newChildren);
+    }
+    return inputPos;
+  }
+
+  private void updateFilterMap(int[][] filterMap, int inputPos, int joinPos) {
+    int[] map = filterMap[inputPos];
+    if (map == null) {
+      filterMap[inputPos] = new int[2];
+      filterMap[inputPos][0] = joinPos;
+      filterMap[inputPos][1]++;
+    } else {
+      boolean inserted = false;
+      for (int j=0; j<map.length/2 && !inserted; j++) {
+        if (map[j*2] == joinPos) {
+          map[j*2+1]++;
+          inserted = true;
+        }
+      }
+      if (!inserted) {
+        int[] newMap = new int[map.length + 2];
+        System.arraycopy(map, 0, newMap, 0, map.length);
+        newMap[map.length] = joinPos;
+        newMap[map.length+1]++;
+        filterMap[inputPos] = newMap;
+      }
+    }
+  }
+
+  private JoinType transformJoinType(JoinRelType type) {
+    JoinType resultJoinType;
+    switch (type) {
+    case FULL:
+      resultJoinType = JoinType.FULLOUTER;
+      break;
+    case LEFT:
+      resultJoinType = JoinType.LEFTOUTER;
+      break;
+    case RIGHT:
+      resultJoinType = JoinType.RIGHTOUTER;
+      break;
+    default:
+      resultJoinType = JoinType.INNER;
+      break;
+    }
+    return resultJoinType;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/package-info.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/package-info.java
new file mode 100644
index 0000000..f723698
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Operator converter for the return path. */
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 2b9caac..ea5fa3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -227,13 +227,13 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveNoAggregateIn
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.MaterializedViewRewritingRelVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTBuilder;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter;
-import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinTypeCheckCtx;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.PlanModifierForReturnPath;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.RexNodeConverter;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter;
 import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;