You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/04/27 22:54:01 UTC

[GitHub] [hive] jcamachor commented on a change in pull request #994: HIVE-21304 bucketing version

jcamachor commented on a change in pull request #994:
URL: https://github.com/apache/hive/pull/994#discussion_r416190205



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
##########
@@ -800,6 +800,7 @@ public static TableDesc getTableDesc(String cols, String colTypes) {
         serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
         serdeConstants.LIST_COLUMNS, cols,
         serdeConstants.LIST_COLUMN_TYPES, colTypes,
+            hive_metastoreConstants.TABLE_BUCKETING_VERSION, "-1",

Review comment:
       nit. indentation

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketVersionPopulator.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
+import org.apache.hadoop.hive.ql.lib.SemanticGraphWalker;
+import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
+import org.apache.hadoop.hive.ql.lib.SemanticRule;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class BucketVersionPopulator extends Transform {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(BucketVersionPopulator.class);
+
+  @Deprecated
+

Review comment:
       nit. newline

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
##########
@@ -350,6 +351,7 @@ public static AbstractSerDe getSpillSerDe(byte alias, TableDesc[] spillTableDesc
           + Utilities.ctrlaCode,
           org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, colNames
           .toString(),
+              hive_metastoreConstants.TABLE_BUCKETING_VERSION, "-1",

Review comment:
       nit. indentation

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketVersionPopulator.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
+import org.apache.hadoop.hive.ql.lib.SemanticGraphWalker;
+import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
+import org.apache.hadoop.hive.ql.lib.SemanticRule;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class BucketVersionPopulator extends Transform {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(BucketVersionPopulator.class);
+
+  @Deprecated
+
+  protected ParseContext pGraphContext;
+
+  static class BucketingVersionResult {
+    Integer bucketingVersion;
+
+    public BucketingVersionResult(Integer version) {
+      bucketingVersion = version;
+    }
+
+    public BucketingVersionResult merge(BucketingVersionResult r) throws SemanticException {
+      if (bucketingVersion == r.bucketingVersion || r.bucketingVersion == -1) {
+        return new BucketingVersionResult(bucketingVersion);
+      }
+      if (bucketingVersion == -1) {
+        return new BucketingVersionResult(r.bucketingVersion);
+      }
+      throw new SemanticException("invalid state; can't set bucketingVersion correctly");
+    }
+
+    public BucketingVersionResult merge2(BucketingVersionResult r) {
+      if (bucketingVersion == r.bucketingVersion || r.bucketingVersion == -1) {
+        return new BucketingVersionResult(bucketingVersion);
+      }
+      return new BucketingVersionResult(2);
+    }
+  }
+
+  @Deprecated
+  Set<OpGroup> groups = new HashSet<BucketVersionPopulator.OpGroup>();
+
+  Map<Operator<?>, OpGroup> b = new IdentityHashMap<>();
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    pGraphContext = pctx;
+    findOpGroups();
+    assignGroupVersions();
+    return pctx;
+  }
+
+  private void assignGroupVersions() {
+    Set<OpGroup> g = groups;
+    for (OpGroup opGroup : g) {
+      opGroup.analyzeBucketVersion();
+      opGroup.setBucketVersion();
+    }
+
+  }
+
+  private ParseContext findOpGroups() throws SemanticException {
+
+    NodeProcessorCtx ctx = new NodeProcessorCtx() {
+    };
+
+    Map<SemanticRule, SemanticNodeProcessor> opRules = new LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
+
+    SemanticDispatcher disp = new DefaultRuleDispatcher(new SetPreferredBucketingVersionRule(), opRules, ctx);
+    //    SemanticGraphWalker ogw = new PreOrderWalker(disp);

Review comment:
       Can be deleted?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketVersionPopulator.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
+import org.apache.hadoop.hive.ql.lib.SemanticGraphWalker;
+import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
+import org.apache.hadoop.hive.ql.lib.SemanticRule;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class BucketVersionPopulator extends Transform {

Review comment:
       Can we add a comment here describing what this transformation class is doing? Additionally, please add some comments in the internal classes.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
##########
@@ -811,32 +809,32 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont
     // tables and version 2 for new tables. All the inputs to the SMB must be
     // from same version. This only applies to tables read directly and not
     // intermediate outputs of joins/groupbys
-    int bucketingVersion = -1;
-    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
-      // Check if the parent is coming from a table scan, if so, what is the version of it.
-      assert parentOp.getParentOperators() != null && parentOp.getParentOperators().size() == 1;
-      Operator<?> op = parentOp.getParentOperators().get(0);
-      while(op != null && !(op instanceof TableScanOperator
-              || op instanceof ReduceSinkOperator
-              || op instanceof CommonJoinOperator)) {
-        // If op has parents it is guaranteed to be 1.
-        List<Operator<?>> parents = op.getParentOperators();
-        Preconditions.checkState(parents.size() == 0 || parents.size() == 1);
-        op = parents.size() == 1 ? parents.get(0) : null;
-      }
-
-      if (op instanceof TableScanOperator) {
-        int localVersion = ((TableScanOperator)op).getConf().
-                getTableMetadata().getBucketingVersion();
-        if (bucketingVersion == -1) {
-          bucketingVersion = localVersion;
-        } else if (bucketingVersion != localVersion) {
-          // versions dont match, return false.
-          LOG.debug("SMB Join can't be performed due to bucketing version mismatch");
-          return false;
-        }
-      }
-    }
+    //    int bucketingVersion = -1;

Review comment:
       note. code commented out that can be deleted

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketVersionPopulator.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
+import org.apache.hadoop.hive.ql.lib.SemanticGraphWalker;
+import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
+import org.apache.hadoop.hive.ql.lib.SemanticRule;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class BucketVersionPopulator extends Transform {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(BucketVersionPopulator.class);
+
+  @Deprecated
+
+  protected ParseContext pGraphContext;
+
+  static class BucketingVersionResult {
+    Integer bucketingVersion;
+
+    public BucketingVersionResult(Integer version) {
+      bucketingVersion = version;
+    }
+
+    public BucketingVersionResult merge(BucketingVersionResult r) throws SemanticException {
+      if (bucketingVersion == r.bucketingVersion || r.bucketingVersion == -1) {
+        return new BucketingVersionResult(bucketingVersion);
+      }
+      if (bucketingVersion == -1) {
+        return new BucketingVersionResult(r.bucketingVersion);
+      }
+      throw new SemanticException("invalid state; can't set bucketingVersion correctly");
+    }
+
+    public BucketingVersionResult merge2(BucketingVersionResult r) {
+      if (bucketingVersion == r.bucketingVersion || r.bucketingVersion == -1) {
+        return new BucketingVersionResult(bucketingVersion);
+      }
+      return new BucketingVersionResult(2);
+    }
+  }
+
+  @Deprecated
+  Set<OpGroup> groups = new HashSet<BucketVersionPopulator.OpGroup>();
+
+  Map<Operator<?>, OpGroup> b = new IdentityHashMap<>();
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    pGraphContext = pctx;
+    findOpGroups();
+    assignGroupVersions();
+    return pctx;
+  }
+
+  private void assignGroupVersions() {
+    Set<OpGroup> g = groups;
+    for (OpGroup opGroup : g) {
+      opGroup.analyzeBucketVersion();
+      opGroup.setBucketVersion();
+    }
+
+  }
+
+  private ParseContext findOpGroups() throws SemanticException {
+
+    NodeProcessorCtx ctx = new NodeProcessorCtx() {
+    };
+
+    Map<SemanticRule, SemanticNodeProcessor> opRules = new LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
+
+    SemanticDispatcher disp = new DefaultRuleDispatcher(new SetPreferredBucketingVersionRule(), opRules, ctx);
+    //    SemanticGraphWalker ogw = new PreOrderWalker(disp);
+    SemanticGraphWalker ogw = new DefaultGraphWalker(disp);
+
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pGraphContext.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    return pGraphContext;
+  }
+
+  class OpGroup {
+    Set<Operator<?>> members = Sets.newIdentityHashSet();
+    int version = -1;
+
+    public OpGroup() {
+      groups.add(this);
+    }
+
+    public void add(Operator o) {
+      members.add(o);
+      b.put(o, this);
+    }
+
+    public void setBucketVersion() {
+      for (Operator<?> operator : members) {
+        operator.getConf().setBucketingVersion(version);
+      }
+    }
+
+    class OperatorBucketingVersionInfo {
+
+      private Operator<?> op;
+      private int bucketingVersion;
+
+      public OperatorBucketingVersionInfo(Operator<?> op, int bucketingVersion) {
+        this.op = op;
+        this.bucketingVersion = bucketingVersion;
+      }
+
+      @Override
+      public String toString() {
+        return String.format("[op: %s, bucketingVersion=%d]", op, bucketingVersion);
+      }
+    }
+
+    List<OperatorBucketingVersionInfo> getBucketingVersions() {
+      List<OperatorBucketingVersionInfo> ret = new ArrayList<>();
+      for (Operator<?> operator : members) {
+        if (operator instanceof TableScanOperator) {
+          TableScanOperator tso = (TableScanOperator) operator;
+          int bucketingVersion = tso.getConf().getTableMetadata().getBucketingVersion();
+          int numBuckets = tso.getConf().getNumBuckets();
+          if (numBuckets > 1) {
+            ret.add(new OperatorBucketingVersionInfo(operator, bucketingVersion));
+          } else {
+            LOG.info("not considering bucketingVersion for: %s because it has %d<2 buckets ", tso, numBuckets);
+          }
+        }
+        if (operator instanceof FileSinkOperator) {
+          FileSinkOperator fso = (FileSinkOperator) operator;
+          int bucketingVersion = fso.getConf().getTableInfo().getBucketingVersion();
+          ret.add(new OperatorBucketingVersionInfo(operator, bucketingVersion));
+        }
+      }
+      return ret;
+    }
+
+    public void analyzeBucketVersion() {
+      List<OperatorBucketingVersionInfo> bucketingVersions = getBucketingVersions();
+      try {
+        for (OperatorBucketingVersionInfo info : bucketingVersions) {

Review comment:
       nit. indentation

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketVersionPopulator.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
+import org.apache.hadoop.hive.ql.lib.SemanticGraphWalker;
+import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
+import org.apache.hadoop.hive.ql.lib.SemanticRule;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class BucketVersionPopulator extends Transform {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(BucketVersionPopulator.class);
+
+  @Deprecated
+
+  protected ParseContext pGraphContext;
+
+  static class BucketingVersionResult {
+    Integer bucketingVersion;
+
+    public BucketingVersionResult(Integer version) {
+      bucketingVersion = version;
+    }
+
+    public BucketingVersionResult merge(BucketingVersionResult r) throws SemanticException {
+      if (bucketingVersion == r.bucketingVersion || r.bucketingVersion == -1) {
+        return new BucketingVersionResult(bucketingVersion);
+      }
+      if (bucketingVersion == -1) {
+        return new BucketingVersionResult(r.bucketingVersion);
+      }
+      throw new SemanticException("invalid state; can't set bucketingVersion correctly");
+    }
+
+    public BucketingVersionResult merge2(BucketingVersionResult r) {
+      if (bucketingVersion == r.bucketingVersion || r.bucketingVersion == -1) {
+        return new BucketingVersionResult(bucketingVersion);
+      }
+      return new BucketingVersionResult(2);
+    }
+  }
+
+  @Deprecated
+  Set<OpGroup> groups = new HashSet<BucketVersionPopulator.OpGroup>();
+
+  Map<Operator<?>, OpGroup> b = new IdentityHashMap<>();
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    pGraphContext = pctx;
+    findOpGroups();
+    assignGroupVersions();
+    return pctx;
+  }
+
+  private void assignGroupVersions() {
+    Set<OpGroup> g = groups;
+    for (OpGroup opGroup : g) {
+      opGroup.analyzeBucketVersion();
+      opGroup.setBucketVersion();
+    }
+
+  }
+
+  private ParseContext findOpGroups() throws SemanticException {
+
+    NodeProcessorCtx ctx = new NodeProcessorCtx() {
+    };
+
+    Map<SemanticRule, SemanticNodeProcessor> opRules = new LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
+
+    SemanticDispatcher disp = new DefaultRuleDispatcher(new SetPreferredBucketingVersionRule(), opRules, ctx);
+    //    SemanticGraphWalker ogw = new PreOrderWalker(disp);
+    SemanticGraphWalker ogw = new DefaultGraphWalker(disp);
+
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pGraphContext.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    return pGraphContext;
+  }
+
+  class OpGroup {
+    Set<Operator<?>> members = Sets.newIdentityHashSet();
+    int version = -1;
+
+    public OpGroup() {
+      groups.add(this);
+    }
+
+    public void add(Operator o) {
+      members.add(o);
+      b.put(o, this);
+    }
+
+    public void setBucketVersion() {
+      for (Operator<?> operator : members) {
+        operator.getConf().setBucketingVersion(version);
+      }
+    }
+
+    class OperatorBucketingVersionInfo {
+
+      private Operator<?> op;
+      private int bucketingVersion;
+
+      public OperatorBucketingVersionInfo(Operator<?> op, int bucketingVersion) {
+        this.op = op;
+        this.bucketingVersion = bucketingVersion;
+      }
+
+      @Override
+      public String toString() {
+        return String.format("[op: %s, bucketingVersion=%d]", op, bucketingVersion);
+      }
+    }
+
+    List<OperatorBucketingVersionInfo> getBucketingVersions() {
+      List<OperatorBucketingVersionInfo> ret = new ArrayList<>();
+      for (Operator<?> operator : members) {
+        if (operator instanceof TableScanOperator) {
+          TableScanOperator tso = (TableScanOperator) operator;
+          int bucketingVersion = tso.getConf().getTableMetadata().getBucketingVersion();
+          int numBuckets = tso.getConf().getNumBuckets();
+          if (numBuckets > 1) {
+            ret.add(new OperatorBucketingVersionInfo(operator, bucketingVersion));
+          } else {
+            LOG.info("not considering bucketingVersion for: %s because it has %d<2 buckets ", tso, numBuckets);
+          }
+        }
+        if (operator instanceof FileSinkOperator) {
+          FileSinkOperator fso = (FileSinkOperator) operator;
+          int bucketingVersion = fso.getConf().getTableInfo().getBucketingVersion();
+          ret.add(new OperatorBucketingVersionInfo(operator, bucketingVersion));
+        }
+      }
+      return ret;
+    }
+
+    public void analyzeBucketVersion() {
+      List<OperatorBucketingVersionInfo> bucketingVersions = getBucketingVersions();
+      try {
+        for (OperatorBucketingVersionInfo info : bucketingVersions) {
+        setVersion(info.bucketingVersion);
+      }
+      } catch (Exception e) {
+        throw new RuntimeException("Error setting bucketingVersion for group: " + bucketingVersions, e);
+      }
+      if (version == -1) {
+        // use version 2 if possible
+        version = 2;
+      }
+    }
+
+    private void setVersion(int newVersion) {
+      if (version == newVersion || newVersion == -1) {
+        return;
+      }
+      if (version == -1) {
+        version = newVersion;

Review comment:
       Should we add in debug level a line here showing that we are setting the version for a certain op?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
##########
@@ -162,9 +160,9 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
 
       listBucketCols.add(bucketCols);
       OpTraits opTraits = new OpTraits(listBucketCols, numBuckets,
-              listBucketCols, numReduceSinks, bucketingVersion);
+          listBucketCols, numReduceSinks);
       rs.setOpTraits(opTraits);
-      rs.setBucketingVersion(bucketingVersion);
+//      rs.getConf().setBucketingVersion(bucketingVersion);

Review comment:
       Commented out code.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
##########
@@ -243,15 +248,7 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
     markOperatorsWithUnstableRuntimeStats(procCtx);
     perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "markOperatorsWithUnstableRuntimeStats");
 
-    // ATTENTION : DO NOT, I REPEAT, DO NOT WRITE ANYTHING AFTER updateBucketingVersionForUpgrade()
-    // ANYTHING WHICH NEEDS TO BE ADDED MUST BE ADDED ABOVE
-    // This call updates the bucketing version of final ReduceSinkOp based on
-    // the bucketing version of FileSinkOp. This operation must happen at the
-    // end to ensure there is no further rewrite of plan which may end up
-    // removing/updating the ReduceSinkOp as was the case with SortedDynPartitionOptimizer
-    // Update bucketing version of ReduceSinkOp if needed
-    updateBucketingVersionForUpgrade(procCtx);
-
+    bucketingVersionSanityCheck(procCtx);

Review comment:
       +100
   
   Can we run this check only in test mode? Or limit scope to certain type of queries? We should try to avoid doing an additional traversal of the plan for every query.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
##########
@@ -46,6 +47,8 @@
    */
   protected Map<String, ExprNodeDesc> colExprMap;
 
+  private String myName = "N/A";

Review comment:
       static final.
   
   Is this actually used though?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
##########
@@ -2111,8 +2108,22 @@ private void updateBucketingVersionForUpgrade(OptimizeTezProcContext procCtx) {
           continue;
         }
 
-        // Found the target RSOp
-        parent.setBucketingVersion(fsOp.getConf().getTableInfo().getBucketingVersion());
+        // Found the target RSOp 0
+        int bucketingVersion = fsOp.getConf().getTableInfo().getBucketingVersion();
+        if (fsOp.getConf().getTableInfo().getBucketingVersion() == -1) {
+          break;
+        }
+        if (fsOp.getConf().getTableInfo().getBucketingVersion() != fsOp.getConf().getBucketingVersion()) {
+          throw new RuntimeException("FsOp bucketingVersions is inconsistent with its tableinfo");
+        }
+        if (processedOperators.containsKey(parent) && processedOperators.get(parent) != bucketingVersion) {
+          throw new SemanticException(String.format(
+              "Operator (%s) is already processed and is using bucketingVersion(%d); so it can't be changed to %d ",
+              parent, processedOperators.get(parent), bucketingVersion));
+        }
+        processedOperators.put(parent, bucketingVersion);
+
+        //parent.getConf().setBucketingVersion(bucketingVersion);

Review comment:
       Commented out code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org