You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mi...@apache.org on 2017/11/30 18:13:21 UTC
hive git commit: HIVE-14792: AvroSerde reads the remote schema-file
at least once per mapper, per table reference. (Mithun Radhakrishnan,
reviewed by Aihua Xu)
Repository: hive
Updated Branches:
refs/heads/master 2efb7d38a -> 7dfbbd89d
HIVE-14792: AvroSerde reads the remote schema-file at least once per mapper, per table reference. (Mithun Radhakrishnan, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7dfbbd89
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7dfbbd89
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7dfbbd89
Branch: refs/heads/master
Commit: 7dfbbd89de07bd36aa135f3b3101729f231ed4bf
Parents: 2efb7d3
Author: Mithun RK <mi...@apache.org>
Authored: Thu Nov 30 10:12:11 2017 -0800
Committer: Mithun RK <mi...@apache.org>
Committed: Thu Nov 30 10:12:34 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 10 ++
.../hadoop/hive/ql/optimizer/Optimizer.java | 4 +
.../TablePropertyEnrichmentOptimizer.java | 125 +++++++++++++++++++
.../hadoop/hive/serde2/avro/AvroSerDe.java | 6 +-
4 files changed, 144 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0cc8de0..ada2318 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1695,6 +1695,16 @@ public class HiveConf extends Configuration {
"Whether to remove an extra join with sq_count_check for scalar subqueries "
+ "with constant group by keys."),
+ HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE("hive.optimize.update.table.properties.from.serde", false,
+ "Whether to update table-properties by initializing tables' SerDe instances during logical-optimization. \n" +
+ "By doing so, certain SerDe classes (like AvroSerDe) can pre-calculate table-specific information, and \n" +
+ "store it in table-properties, to be used later in the SerDe, while running the job."),
+
+ HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE_LIST("hive.optimize.update.table.properties.from.serde.list",
+ "org.apache.hadoop.hive.serde2.avro.AvroSerDe",
+ "The comma-separated list of SerDe classes that are considered when enhancing table-properties \n" +
+ "during logical optimization."),
+
// CTE
HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1,
"If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index e1340c7..bdff57c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -236,6 +236,10 @@ public class Optimizer {
transformations.add(new SimpleFetchAggregation());
}
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE)) {
+ transformations.add(new TablePropertyEnrichmentOptimizer());
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java
new file mode 100644
index 0000000..98acb0d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TablePropertyEnrichmentOptimizer.java
@@ -0,0 +1,125 @@
+package org.apache.hadoop.hive.ql.optimizer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * Optimizer that updates TableScanOperators' Table-references with properties that might be
+ * updated/pre-fetched by initializing the table's SerDe.
+ * E.g. AvroSerDes can now prefetch schemas from schema-urls and update the table-properties directly.
+ */
+class TablePropertyEnrichmentOptimizer extends Transform {
+
+ private static Log LOG = LogFactory.getLog(TablePropertyEnrichmentOptimizer.class);
+
+ private static class WalkerCtx implements NodeProcessorCtx {
+
+ Configuration conf;
+ Set<String> serdeClassesUnderConsideration = Sets.newHashSet();
+
+ WalkerCtx(Configuration conf) {
+ this.conf = conf;
+ serdeClassesUnderConsideration.addAll(
+ Arrays.asList( HiveConf.getVar(conf,
+ HiveConf.ConfVars.HIVE_OPTIMIZE_TABLE_PROPERTIES_FROM_SERDE_LIST)
+ .split(",")));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TablePropertyEnrichmentOptimizer considers these SerDe classes:");
+ for (String className : serdeClassesUnderConsideration) {
+ LOG.debug(className);
+ }
+ }
+ }
+ }
+
+ private static class Processor implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ TableScanOperator tsOp = (TableScanOperator) nd;
+ WalkerCtx context = (WalkerCtx)procCtx;
+
+ TableScanDesc tableScanDesc = tsOp.getConf();
+ Table table = tsOp.getConf().getTableMetadata().getTTable();
+ Map<String, String> tableParameters = table.getParameters();
+ Properties tableProperties = new Properties();
+ tableProperties.putAll(tableParameters);
+
+ Deserializer deserializer = tableScanDesc.getTableMetadata().getDeserializer();
+ String deserializerClassName = deserializer.getClass().getName();
+ try {
+ if (context.serdeClassesUnderConsideration.contains(deserializerClassName)) {
+ deserializer.initialize(context.conf, tableProperties);
+ LOG.debug("SerDe init succeeded for class: " + deserializerClassName);
+ for (Map.Entry property : tableProperties.entrySet()) {
+ if (!property.getValue().equals(tableParameters.get(property.getKey()))) {
+ LOG.debug("Resolving changed parameters! key=" + property.getKey() + ", value=" + property.getValue());
+ tableParameters.put((String) property.getKey(), (String) property.getValue());
+ }
+ }
+ }
+ else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping prefetch for " + deserializerClassName);
+ }
+ }
+ }
+ catch(Throwable t) {
+ LOG.error("SerDe init failed for SerDe class==" + deserializerClassName
+ + ". Didn't change table-properties", t);
+ }
+
+ return nd;
+ }
+ }
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ LOG.info("TablePropertyEnrichmentOptimizer::transform().");
+
+ Map<Rule, NodeProcessor> opRules = Maps.newLinkedHashMap();
+ opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
+ new Processor());
+
+ WalkerCtx context = new WalkerCtx(pctx.getConf());
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context);
+
+ List<Node> topNodes = Lists.newArrayList();
+ topNodes.addAll(pctx.getTopOps().values());
+
+ GraphWalker walker = new PreOrderWalker(disp);
+ walker.startWalking(topNodes, null);
+
+ LOG.info("TablePropertyEnrichmentOptimizer::transform() complete!");
+ return pctx;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7dfbbd89/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
index 5467d8a..1746a0f 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
@@ -90,6 +90,9 @@ public class AvroSerDe extends AbstractSerDe {
LOG.debug("Resetting already initialized AvroSerDe");
}
+ LOG.info("AvroSerde::initialize(): Preset value of avro.schema.literal == "
+ + properties.get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+
schema = null;
oi = null;
columnNames = null;
@@ -112,9 +115,10 @@ public class AvroSerDe extends AbstractSerDe {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
schema = getSchemaFromCols(properties, columnNames, columnTypes, columnCommentProperty);
- properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString());
}
+ properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString());
+
if (LOG.isDebugEnabled()) {
LOG.debug("Avro schema is " + schema);
}