You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mi...@apache.org on 2017/11/30 18:13:21 UTC

hive git commit: HIVE-14792: AvroSerde reads the remote schema-file at least once per mapper, per table reference. (Mithun Radhakrishnan, reviewed by Aihua Xu)

Repository: hive
Updated Branches:
  refs/heads/master 2efb7d38a -> 7dfbbd89d


HIVE-14792: AvroSerde reads the remote schema-file at least once per mapper, per table reference. (Mithun Radhakrishnan, reviewed by Aihua Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7dfbbd89
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7dfbbd89
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7dfbbd89

Branch: refs/heads/master
Commit: 7dfbbd89de07bd36aa135f3b3101729f231ed4bf
Parents: 2efb7d3
Author: Mithun RK <mi...@apache.org>
Authored: Thu Nov 30 10:12:11 2017 -0800
Committer: Mithun RK <mi...@apache.org>
Committed: Thu Nov 30 10:12:34 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  10 ++
 .../hadoop/hive/ql/optimizer/Optimizer.java     |   4 +
 .../TablePropertyEnrichmentOptimizer.java       | 125 +++++++++++++++++++
 .../hadoop/hive/serde2/avro/AvroSerDe.java      |   6 +-
 4 files changed, 144 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0cc8de0..ada2318 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1695,6 +1695,16 @@ public class HiveConf extends Configuration {
         "Whether to remove an extra join with sq_count_check for scalar subqueries "
             + "with constant group by keys."),
 
+    HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE("hive.optimize.update.table.properties.from.serde", false,
+        "Whether to update table-properties by initializing tables' SerDe instances during logical-optimization. \n" +
+            "By doing so, certain SerDe classes (like AvroSerDe) can pre-calculate table-specific information, and \n" +
+            "store it in table-properties, to be used later in the SerDe, while running the job."),
+
+    HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE_LIST("hive.optimize.update.table.properties.from.serde.list",
+        "org.apache.hadoop.hive.serde2.avro.AvroSerDe",
+        "The comma-separated list of SerDe classes that are considered when enhancing table-properties \n" +
+            "during logical optimization."),
+
     // CTE
     HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1,
         "If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index e1340c7..bdff57c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -236,6 +236,10 @@ public class Optimizer {
       transformations.add(new SimpleFetchAggregation());
     }
 
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE)) {
+      transformations.add(new TablePropertyEnrichmentOptimizer());
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java
new file mode 100644
index 0000000..98acb0d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java
@@ -0,0 +1,125 @@
+package org.apache.hadoop.hive.ql.optimizer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * Optimizer that updates TableScanOperators' Table-references with properties that might be
+ * updated/pre-fetched by initializing the table's SerDe.
+ * E.g. AvroSerDes can now prefetch schemas from schema-urls and update the table-properties directly.
+ */
+class TablePropertyEnrichmentOptimizer extends Transform {
+
+  private static Log LOG = LogFactory.getLog(TablePropertyEnrichmentOptimizer.class);
+
+  private static class WalkerCtx implements NodeProcessorCtx {
+
+    Configuration conf;
+    Set<String> serdeClassesUnderConsideration = Sets.newHashSet();
+
+    WalkerCtx(Configuration conf) {
+      this.conf = conf;
+      serdeClassesUnderConsideration.addAll(
+          Arrays.asList( HiveConf.getVar(conf,
+                                         HiveConf.ConfVars.HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE_LIST)
+                                 .split(",")));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("TablePropertyEnrichmentOptimizer considers these SerDe classes:");
+        for (String className : serdeClassesUnderConsideration) {
+          LOG.debug(className);
+        }
+      }
+    }
+  }
+
+  private static class Processor implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+      TableScanOperator tsOp = (TableScanOperator) nd;
+      WalkerCtx context = (WalkerCtx)procCtx;
+
+      TableScanDesc tableScanDesc = tsOp.getConf();
+      Table table = tsOp.getConf().getTableMetadata().getTTable();
+      Map<String, String> tableParameters = table.getParameters();
+      Properties tableProperties = new Properties();
+      tableProperties.putAll(tableParameters);
+
+      Deserializer deserializer = tableScanDesc.getTableMetadata().getDeserializer();
+      String deserializerClassName = deserializer.getClass().getName();
+      try {
+        if (context.serdeClassesUnderConsideration.contains(deserializerClassName)) {
+          deserializer.initialize(context.conf, tableProperties);
+          LOG.debug("SerDe init succeeded for class: " + deserializerClassName);
+          for (Map.Entry property : tableProperties.entrySet()) {
+            if (!property.getValue().equals(tableParameters.get(property.getKey()))) {
+              LOG.debug("Resolving changed parameters! key=" + property.getKey() + ", value=" + property.getValue());
+              tableParameters.put((String) property.getKey(), (String) property.getValue());
+            }
+          }
+        }
+        else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipping prefetch for " + deserializerClassName);
+          }
+        }
+      }
+      catch(Throwable t) {
+        LOG.error("SerDe init failed for SerDe class==" + deserializerClassName
+                  + ". Didn't change table-properties", t);
+      }
+
+      return nd;
+    }
+  }
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+    LOG.info("TablePropertyEnrichmentOptimizer::transform().");
+
+    Map<Rule, NodeProcessor> opRules = Maps.newLinkedHashMap();
+    opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
+        new Processor());
+
+    WalkerCtx context = new WalkerCtx(pctx.getConf());
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context);
+
+    List<Node> topNodes = Lists.newArrayList();
+    topNodes.addAll(pctx.getTopOps().values());
+
+    GraphWalker walker = new PreOrderWalker(disp);
+    walker.startWalking(topNodes, null);
+
+    LOG.info("TablePropertyEnrichmentOptimizer::transform() complete!");
+    return pctx;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
index 5467d8a..1746a0f 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
@@ -90,6 +90,9 @@ public class AvroSerDe extends AbstractSerDe {
       LOG.debug("Resetting already initialized AvroSerDe");
     }
 
+    LOG.info("AvroSerde::initialize(): Preset value of avro.schema.literal == "
+        + properties.get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+
     schema = null;
     oi = null;
     columnNames = null;
@@ -112,9 +115,10 @@ public class AvroSerDe extends AbstractSerDe {
       columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
 
       schema = getSchemaFromCols(properties, columnNames, columnTypes, columnCommentProperty);
-      properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString());
     }
 
+    properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString());
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Avro schema is " + schema);
     }