You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:15:02 UTC
[45/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java
deleted file mode 100644
index a00bf53..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/ColumnLineageGraph.java
+++ /dev/null
@@ -1,680 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-import com.cloudera.impala.thrift.TEdgeType;
-import com.cloudera.impala.thrift.TQueryCtx;
-import com.cloudera.impala.thrift.TLineageGraph;
-import com.cloudera.impala.thrift.TMultiEdge;
-import com.cloudera.impala.thrift.TVertex;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
-/**
- * Represents a vertex in the column lineage graph. A Vertex may correspond to a base
- * table column, a column in the destination table (for the case of INSERT or CTAS
- * queries) or a result expr (labeled column of a query result set).
- */
-final class Vertex implements Comparable<Vertex> {
- // Unique identifier of this vertex.
- private final VertexId id_;
-
- private final String type_ = "COLUMN";
-
- // A fully-qualified column name or the label of a result expr
- private final String label_;
-
- public Vertex(VertexId id, String label) {
- Preconditions.checkNotNull(id);
- Preconditions.checkNotNull(label);
- id_ = id;
- label_ = label;
- }
- public VertexId getVertexId() { return id_; }
- public String getLabel() { return label_; }
- public String getType() { return type_; }
-
- @Override
- public String toString() { return "(" + id_ + ":" + type_ + ":" + label_ + ")"; }
-
- /**
- * Encodes this Vertex object into a JSON object represented by a Map.
- */
- public Map toJson() {
- // Use a LinkedHashMap to generate a strict ordering of elements.
- Map obj = new LinkedHashMap();
- obj.put("id", id_.asInt());
- obj.put("vertexType", type_);
- obj.put("vertexId", label_);
- return obj;
- }
-
- /**
- * Constructs a Vertex object from a JSON object. The new object is returned.
- */
- public static Vertex fromJsonObj(JSONObject obj) {
- int id = ((Long) obj.get("id")).intValue();
- String label = (String) obj.get("vertexId");
- return new Vertex(new VertexId(id), label);
- }
-
- /**
- * Encodes this Vertex object into a thrift object
- */
- public TVertex toThrift() {
- return new TVertex(id_.asInt(), label_);
- }
-
- /**
- * Constructs a Vertex object from a thrift object.
- */
- public static Vertex fromThrift(TVertex vertex) {
- int id = ((Long) vertex.id).intValue();
- return new Vertex(new VertexId(id), vertex.label);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) return false;
- if (obj.getClass() != this.getClass()) return false;
- Vertex vertex = (Vertex) obj;
- return this.id_.equals(vertex.id_) &&
- this.label_.equals(vertex.label_);
- }
-
- public int compareTo(Vertex cmp) { return this.id_.compareTo(cmp.id_); }
-
- @Override
- public int hashCode() { return id_.hashCode(); }
-}
-
-/**
- * Represents the unique identifier of a Vertex.
- */
-class VertexId extends Id<VertexId> {
- protected VertexId(int id) {
- super(id);
- }
- public static IdGenerator<VertexId> createGenerator() {
- return new IdGenerator<VertexId>() {
- @Override
- public VertexId getNextId() { return new VertexId(nextId_++); }
- @Override
- public VertexId getMaxId() { return new VertexId(nextId_ - 1); }
- };
- }
-}
-
-/**
- * Represents a set of uni-directional edges in the column lineage graph, one edge from
- * every source Vertex in 'sources_' to every target Vertex in 'targets_'. An edge
- * indicates a dependency between a source and a target Vertex. There are two types of
- * edges, PROJECTION and PREDICATE, that are described in the ColumnLineageGraph class.
- */
-final class MultiEdge {
- public static enum EdgeType {
- PROJECTION, PREDICATE
- }
- private final Set<Vertex> sources_;
- private final Set<Vertex> targets_;
- private final EdgeType edgeType_;
-
- public MultiEdge(Set<Vertex> sources, Set<Vertex> targets, EdgeType type) {
- sources_ = sources;
- targets_ = targets;
- edgeType_ = type;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- Joiner joiner = Joiner.on(",");
- builder.append("Sources: [");
- builder.append(joiner.join(sources_) + "]\n");
- builder.append("Targets: [");
- builder.append(joiner.join(targets_) + "]\n");
- builder.append("Type: " + edgeType_);
- return builder.toString();
- }
-
- /**
- * Encodes this MultiEdge object to a JSON object represented by a Map.
- */
- public Map toJson() {
- Map obj = new LinkedHashMap();
- // Add sources
- JSONArray sourceIds = new JSONArray();
- for (Vertex vertex: sources_) {
- sourceIds.add(vertex.getVertexId());
- }
- obj.put("sources", sourceIds);
- // Add targets
- JSONArray targetIds = new JSONArray();
- for (Vertex vertex: targets_) {
- targetIds.add(vertex.getVertexId());
- }
- obj.put("targets", targetIds);
- obj.put("edgeType", edgeType_.toString());
- return obj;
- }
-
- /**
- * Encodes this MultiEdge object to a thrift object
- */
- public TMultiEdge toThrift() {
- List<TVertex> sources = Lists.newArrayList();
- for (Vertex vertex: sources_) {
- sources.add(vertex.toThrift());
- }
- List<TVertex> targets = Lists.newArrayList();
- for (Vertex vertex: targets_) {
- targets.add(vertex.toThrift());
- }
- if (edgeType_ == EdgeType.PROJECTION) {
- return new TMultiEdge(sources, targets, TEdgeType.PROJECTION);
- }
- return new TMultiEdge(sources, targets, TEdgeType.PREDICATE);
- }
-
- /**
- * Constructs a MultiEdge object from a thrift object
- */
- public static MultiEdge fromThrift(TMultiEdge obj){
- Set<Vertex> sources = Sets.newHashSet();
- for (TVertex vertex: obj.sources) {
- sources.add(Vertex.fromThrift(vertex));
- }
- Set<Vertex> targets = Sets.newHashSet();
- for (TVertex vertex: obj.targets) {
- targets.add(Vertex.fromThrift(vertex));
- }
- if (obj.edgetype == TEdgeType.PROJECTION) {
- return new MultiEdge(sources, targets, EdgeType.PROJECTION);
- }
- return new MultiEdge(sources, targets, EdgeType.PREDICATE);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) return false;
- if (obj.getClass() != this.getClass()) return false;
- MultiEdge edge = (MultiEdge) obj;
- return edge.sources_.equals(this.sources_) &&
- edge.targets_.equals(this.targets_) &&
- edge.edgeType_ == this.edgeType_;
- }
-}
-
-/**
- * Represents the column lineage graph of a query. This is a directional graph that is
- * used to track dependencies among the table/column entities that participate in
- * a query. There are two types of dependencies that are represented as edges in the
- * column lineage graph:
- * a) Projection dependency: This is a dependency between a set of source
- * columns (base table columns) and a single target (result expr or table column).
- * This dependency indicates that values of the target depend on the values of the source
- * columns.
- * b) Predicate dependency: This is a dependency between a set of target
- * columns (or exprs) and a set of source columns (base table columns). It indicates that
- * the source columns restrict the values of their targets (e.g. by participating in
- * WHERE clause predicates).
- *
- * The following dependencies are generated for a query:
- * - Exactly one projection dependency for every result expr / target column.
- * - Exactly one predicate dependency that targets all result exprs / target cols and
- * depends on all columns participating in a conjunct in the query.
- * - Special case of analytic fns: One predicate dependency per result expr / target col
- * whose value is directly or indirectly affected by an analytic function with a
- * partition by and/or order by clause.
- */
-public class ColumnLineageGraph {
- private final static Logger LOG = LoggerFactory.getLogger(ColumnLineageGraph.class);
- // Query statement
- private String queryStr_;
-
- // Name of the user that issued this query
- private String user_;
-
- private final List<Expr> resultDependencyPredicates_ = Lists.newArrayList();
-
- private final List<MultiEdge> edges_ = Lists.newArrayList();
-
- // Timestamp in seconds since epoch (GMT) this query was submitted for execution.
- private long timestamp_;
-
- // Map of Vertex labels to Vertex objects.
- private final Map<String, Vertex> vertices_ = Maps.newHashMap();
-
- // Map of Vertex ids to Vertex objects. Used primarily during the construction of the
- // ColumnLineageGraph from a serialized JSON object.
- private final Map<VertexId, Vertex> idToVertexMap_ = Maps.newHashMap();
-
- // For an INSERT or a CTAS, these are the columns of the
- // destination table plus any partitioning columns (when dynamic partitioning is used).
- // For a SELECT stmt, they are the labels of the result exprs.
- private final List<String> targetColumnLabels_ = Lists.newArrayList();
-
- // Repository for tuple and slot descriptors for this query. Use it to construct the
- // column lineage graph.
- private DescriptorTable descTbl_;
-
- private final IdGenerator<VertexId> vertexIdGenerator = VertexId.createGenerator();
-
- public ColumnLineageGraph() { }
-
- /**
- * Private c'tor, used only for testing.
- */
- private ColumnLineageGraph(String stmt, String user, long timestamp) {
- queryStr_ = stmt;
- user_ = user;
- timestamp_ = timestamp;
- }
-
- private void setVertices(Set<Vertex> vertices) {
- for (Vertex vertex: vertices) {
- vertices_.put(vertex.getLabel(), vertex);
- idToVertexMap_.put(vertex.getVertexId(), vertex);
- }
- }
-
- /**
- * Creates a new MultiEdge in the column lineage graph from the sets of 'sources' and
- * 'targets' labels (representing column names or result expr labels). The new
- * MultiEdge object is returned.
- */
- private MultiEdge createMultiEdge(Set<String> targets, Set<String> sources,
- MultiEdge.EdgeType type) {
- Set<Vertex> targetVertices = Sets.newHashSet();
- for (String target: targets) {
- targetVertices.add(createVertex(target));
- }
- Set<Vertex> sourceVertices = Sets.newHashSet();
- for (String source: sources) {
- sourceVertices.add(createVertex(source));
- }
- MultiEdge edge = new MultiEdge(sourceVertices, targetVertices, type);
- edges_.add(edge);
- return edge;
- }
-
- /**
- * Creates a new vertex in the column lineage graph. The new Vertex object is
- * returned. If a Vertex with the same label already exists, reuse it.
- */
- private Vertex createVertex(String label) {
- Vertex newVertex = vertices_.get(label);
- if (newVertex != null) return newVertex;
- newVertex = new Vertex(vertexIdGenerator.getNextId(), label);
- vertices_.put(newVertex.getLabel(), newVertex);
- idToVertexMap_.put(newVertex.getVertexId(), newVertex);
- return newVertex;
- }
-
- /**
- * Computes the column lineage graph of a query from the list of query result exprs.
- * 'rootAnalyzer' is the Analyzer that was used for the analysis of the query.
- */
- public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) {
- init(rootAnalyzer);
- computeProjectionDependencies(resultExprs);
- computeResultPredicateDependencies(rootAnalyzer);
- }
-
- /**
- * Initialize the ColumnLineageGraph from the root analyzer of a query.
- */
- private void init(Analyzer analyzer) {
- Preconditions.checkNotNull(analyzer);
- Preconditions.checkState(analyzer.isRootAnalyzer());
- TQueryCtx queryCtx = analyzer.getQueryCtx();
- if (queryCtx.request.isSetRedacted_stmt()) {
- queryStr_ = queryCtx.request.redacted_stmt;
- } else {
- queryStr_ = queryCtx.request.stmt;
- }
- Preconditions.checkNotNull(queryStr_);
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- try {
- timestamp_ = df.parse(queryCtx.now_string).getTime() / 1000;
- } catch (java.text.ParseException e) {
- LOG.error("Error parsing timestamp value: " + queryCtx.now_string +
- " " + e.getMessage());
- timestamp_ = new Date().getTime() / 1000;
- }
- descTbl_ = analyzer.getDescTbl();
- user_ = analyzer.getUser().getName();
- }
-
- private void computeProjectionDependencies(List<Expr> resultExprs) {
- Preconditions.checkNotNull(resultExprs);
- Preconditions.checkState(!resultExprs.isEmpty());
- Preconditions.checkState(resultExprs.size() == targetColumnLabels_.size());
- for (int i = 0; i < resultExprs.size(); ++i) {
- Expr expr = resultExprs.get(i);
- Set<String> sourceBaseCols = Sets.newHashSet();
- List<Expr> dependentExprs = Lists.newArrayList();
- getSourceBaseCols(expr, sourceBaseCols, dependentExprs, false);
- Set<String> targets = Sets.newHashSet(targetColumnLabels_.get(i));
- createMultiEdge(targets, sourceBaseCols, MultiEdge.EdgeType.PROJECTION);
- if (!dependentExprs.isEmpty()) {
- // We have additional exprs that 'expr' has a predicate dependency on.
- // Gather the transitive predicate dependencies of 'expr' based on its direct
- // predicate dependencies. For each direct predicate dependency p, 'expr' is
- // transitively predicate dependent on all exprs that p is projection and
- // predicate dependent on.
- Set<String> predicateBaseCols = Sets.newHashSet();
- for (Expr dependentExpr: dependentExprs) {
- getSourceBaseCols(dependentExpr, predicateBaseCols, null, true);
- }
- createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE);
- }
- }
- }
-
- /**
- * Compute predicate dependencies for the query result, i.e. exprs that affect the
- * possible values of the result exprs / target columns, such as predicates in a WHERE
- * clause.
- */
- private void computeResultPredicateDependencies(Analyzer analyzer) {
- List<Expr> conjuncts = analyzer.getConjuncts();
- for (Expr expr: conjuncts) {
- if (expr.isAuxExpr()) continue;
- resultDependencyPredicates_.add(expr);
- }
- Set<String> predicateBaseCols = Sets.newHashSet();
- for (Expr expr: resultDependencyPredicates_) {
- getSourceBaseCols(expr, predicateBaseCols, null, true);
- }
- if (predicateBaseCols.isEmpty()) return;
- Set<String> targets = Sets.newHashSet(targetColumnLabels_);
- createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE);
- }
-
- /**
- * Identify the base table columns that 'expr' is connected to by recursively resolving
- * all associated slots through inline views and materialization points to base-table
- * slots. If 'directPredDeps' is not null, it is populated with the exprs that
- * have a predicate dependency with 'expr' (e.g. partitioning and order by exprs for
- * the case of an analytic function). If 'traversePredDeps' is false, not all the
- * children exprs of 'expr' are used to identify the base columns that 'expr' is
- * connected to. Which children are filtered depends on the type of 'expr' (e.g. for
- * AnalyticFunctionExpr, grouping and sorting exprs are filtered out).
- */
- private void getSourceBaseCols(Expr expr, Set<String> sourceBaseCols,
- List<Expr> directPredDeps, boolean traversePredDeps) {
- List<Expr> exprsToTraverse = getProjectionDeps(expr);
- List<Expr> predicateDepExprs = getPredicateDeps(expr);
- if (directPredDeps != null) directPredDeps.addAll(predicateDepExprs);
- if (traversePredDeps) exprsToTraverse.addAll(predicateDepExprs);
- List<SlotId> slotIds = Lists.newArrayList();
- for (Expr e: exprsToTraverse) {
- e.getIds(null, slotIds);
- }
- for (SlotId slotId: slotIds) {
- SlotDescriptor slotDesc = descTbl_.getSlotDesc(slotId);
- List<Expr> sourceExprs = slotDesc.getSourceExprs();
- if (sourceExprs.isEmpty() && slotDesc.isScanSlot() &&
- slotDesc.getPath().isRootedAtTuple()) {
- // slot should correspond to a materialized tuple of a table
- Preconditions.checkState(slotDesc.getParent().isMaterialized());
- List<String> path = slotDesc.getPath().getCanonicalPath();
- sourceBaseCols.add(Joiner.on(".").join(path));
- } else {
- for (Expr sourceExpr: sourceExprs) {
- getSourceBaseCols(sourceExpr, sourceBaseCols, directPredDeps,
- traversePredDeps);
- }
- }
- }
- }
-
- /**
- * Retrieve the exprs that 'e' is directly projection dependent on.
- * TODO Handle conditional exprs (e.g. CASE, IF).
- */
- private List<Expr> getProjectionDeps(Expr e) {
- Preconditions.checkNotNull(e);
- List<Expr> outputExprs = Lists.newArrayList();
- if (e instanceof AnalyticExpr) {
- AnalyticExpr analytic = (AnalyticExpr) e;
- outputExprs.addAll(analytic.getChildren().subList(0,
- analytic.getFnCall().getParams().size()));
- } else {
- outputExprs.add(e);
- }
- return outputExprs;
- }
-
- /**
- * Retrieve the exprs that 'e' is directly predicate dependent on.
- * TODO Handle conditional exprs (e.g. CASE, IF).
- */
- private List<Expr> getPredicateDeps(Expr e) {
- Preconditions.checkNotNull(e);
- List<Expr> outputExprs = Lists.newArrayList();
- if (e instanceof AnalyticExpr) {
- AnalyticExpr analyticExpr = (AnalyticExpr) e;
- outputExprs.addAll(analyticExpr.getPartitionExprs());
- for (OrderByElement orderByElem: analyticExpr.getOrderByElements()) {
- outputExprs.add(orderByElem.getExpr());
- }
- }
- return outputExprs;
- }
-
- public void addDependencyPredicates(Collection<Expr> exprs) {
- resultDependencyPredicates_.addAll(exprs);
- }
-
- /**
- * Encodes the ColumnLineageGraph object to JSON.
- */
- public String toJson() {
- if (Strings.isNullOrEmpty(queryStr_)) return "";
- Map obj = new LinkedHashMap();
- obj.put("queryText", queryStr_);
- obj.put("hash", getQueryHash(queryStr_));
- obj.put("user", user_);
- obj.put("timestamp", timestamp_);
- // Add edges
- JSONArray edges = new JSONArray();
- for (MultiEdge edge: edges_) {
- edges.add(edge.toJson());
- }
- obj.put("edges", edges);
- // Add vertices
- TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
- JSONArray vertices = new JSONArray();
- for (Vertex vertex: sortedVertices) {
- vertices.add(vertex.toJson());
- }
- obj.put("vertices", vertices);
- return JSONValue.toJSONString(obj);
- }
-
- /**
- * Serializes the ColumnLineageGraph to a thrift object
- */
- public TLineageGraph toThrift() {
- TLineageGraph graph = new TLineageGraph();
- if (Strings.isNullOrEmpty(queryStr_)) return graph;
- graph.setQuery_text(queryStr_);
- graph.setHash(getQueryHash(queryStr_));
- graph.setUser(user_);
- graph.setStarted(timestamp_);
- // Add edges
- List<TMultiEdge> edges = Lists.newArrayList();
- for (MultiEdge edge: edges_) {
- edges.add(edge.toThrift());
- }
- graph.setEdges(edges);
- // Add vertices
- TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
- List<TVertex> vertices = Lists.newArrayList();
- for (Vertex vertex: sortedVertices) {
- vertices.add(vertex.toThrift());
- }
- graph.setVertices(vertices);
- return graph;
- }
-
- /**
- * Creates a LineageGraph object from a thrift object
- */
- public static ColumnLineageGraph fromThrift(TLineageGraph obj) {
- ColumnLineageGraph lineage =
- new ColumnLineageGraph(obj.query_text, obj.user, obj.started);
- TreeSet<Vertex> vertices = Sets.newTreeSet();
- for (TVertex vertex: obj.vertices) {
- vertices.add(Vertex.fromThrift(vertex));
- }
- lineage.setVertices(vertices);
- for (TMultiEdge edge: obj.edges) {
- MultiEdge e = MultiEdge.fromThrift(edge);
- lineage.edges_.add(e);
- }
- return lineage;
- }
-
- private String getQueryHash(String queryStr) {
- Hasher hasher = Hashing.md5().newHasher();
- hasher.putString(queryStr);
- return hasher.hash().toString();
- }
-
- /**
- * Creates a ColumnLineageGraph object from a serialized JSON record. The new
- * ColumnLineageGraph object is returned. Used only during testing.
- */
- public static ColumnLineageGraph createFromJSON(String json) {
- if (json == null || json.isEmpty()) return null;
- JSONParser parser = new JSONParser();
- Object obj = null;
- try {
- obj = parser.parse(json);
- } catch (ParseException e) {
- LOG.error("Error parsing serialized column lineage graph: " + e.getMessage());
- return null;
- }
- if (!(obj instanceof JSONObject)) return null;
- JSONObject jsonObj = (JSONObject) obj;
- String stmt = (String) jsonObj.get("queryText");
- String hash = (String) jsonObj.get("hash");
- String user = (String) jsonObj.get("user");
- long timestamp = (Long) jsonObj.get("timestamp");
- ColumnLineageGraph graph = new ColumnLineageGraph(stmt, user, timestamp);
- JSONArray serializedVertices = (JSONArray) jsonObj.get("vertices");
- Set<Vertex> vertices = Sets.newHashSet();
- for (int i = 0; i < serializedVertices.size(); ++i) {
- Vertex v = Vertex.fromJsonObj((JSONObject) serializedVertices.get(i));
- vertices.add(v);
- }
- graph.setVertices(vertices);
- JSONArray serializedEdges = (JSONArray) jsonObj.get("edges");
- for (int i = 0; i < serializedEdges.size(); ++i) {
- MultiEdge e =
- graph.createMultiEdgeFromJSONObj((JSONObject) serializedEdges.get(i));
- graph.edges_.add(e);
- }
- return graph;
- }
-
- private MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge) {
- Preconditions.checkNotNull(jsonEdge);
- JSONArray sources = (JSONArray) jsonEdge.get("sources");
- Set<Vertex> sourceVertices = getVerticesFromJSONArray(sources);
- JSONArray targets = (JSONArray) jsonEdge.get("targets");
- Set<Vertex> targetVertices = getVerticesFromJSONArray(targets);
- MultiEdge.EdgeType type =
- MultiEdge.EdgeType.valueOf((String) jsonEdge.get("edgeType"));
- return new MultiEdge(sourceVertices, targetVertices, type);
- }
-
- private Set<Vertex> getVerticesFromJSONArray(JSONArray vertexIdArray) {
- Set<Vertex> vertices = Sets.newHashSet();
- for (int i = 0; i < vertexIdArray.size(); ++i) {
- int sourceId = ((Long) vertexIdArray.get(i)).intValue();
- Vertex sourceVertex = idToVertexMap_.get(new VertexId(sourceId));
- Preconditions.checkNotNull(sourceVertex);
- vertices.add(sourceVertex);
- }
- return vertices;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) return false;
- if (obj.getClass() != this.getClass()) return false;
- ColumnLineageGraph g = (ColumnLineageGraph) obj;
- if (!this.vertices_.equals(g.vertices_) ||
- !this.edges_.equals(g.edges_)) {
- return false;
- }
- return true;
- }
-
- public String debugString() {
- StringBuilder builder = new StringBuilder();
- for (MultiEdge edge: edges_) {
- builder.append(edge.toString() + "\n");
- }
- builder.append(toJson());
- return builder.toString();
- }
-
- public void addTargetColumnLabels(Collection<String> columnLabels) {
- Preconditions.checkNotNull(columnLabels);
- targetColumnLabels_.addAll(columnLabels);
- }
-
- public void addTargetColumnLabels(Table dstTable) {
- Preconditions.checkNotNull(dstTable);
- String tblFullName = dstTable.getFullName();
- for (String columnName: dstTable.getColumnNames()) {
- targetColumnLabels_.add(tblFullName + "." + columnName);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java b/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java
deleted file mode 100644
index 4869004..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/CompoundPredicate.java
+++ /dev/null
@@ -1,216 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.cloudera.impala.catalog.Db;
-import com.cloudera.impala.catalog.Function.CompareMode;
-import com.cloudera.impala.catalog.ScalarFunction;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.thrift.TExprNode;
-import com.cloudera.impala.thrift.TExprNodeType;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * &&, ||, ! predicates.
- *
- */
-public class CompoundPredicate extends Predicate {
- public enum Operator {
- AND("AND"),
- OR("OR"),
- NOT("NOT");
-
- private final String description;
-
- private Operator(String description) {
- this.description = description;
- }
-
- @Override
- public String toString() {
- return description;
- }
- }
- private final Operator op_;
-
- public static void initBuiltins(Db db) {
- // AND and OR are implemented as custom exprs, so they do not have a function symbol.
- db.addBuiltin(ScalarFunction.createBuiltinOperator(
- Operator.AND.name(), "",
- Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN));
- db.addBuiltin(ScalarFunction.createBuiltinOperator(
- Operator.OR.name(), "",
- Lists.<Type>newArrayList(Type.BOOLEAN, Type.BOOLEAN), Type.BOOLEAN));
- db.addBuiltin(ScalarFunction.createBuiltinOperator(
- Operator.NOT.name(), "impala::CompoundPredicate::Not",
- Lists.<Type>newArrayList(Type.BOOLEAN), Type.BOOLEAN));
- }
-
- public CompoundPredicate(Operator op, Expr e1, Expr e2) {
- super();
- this.op_ = op;
- Preconditions.checkNotNull(e1);
- children_.add(e1);
- Preconditions.checkArgument(op == Operator.NOT && e2 == null
- || op != Operator.NOT && e2 != null);
- if (e2 != null) children_.add(e2);
- }
-
- /**
- * Copy c'tor used in clone().
- */
- protected CompoundPredicate(CompoundPredicate other) {
- super(other);
- op_ = other.op_;
- }
-
- public Operator getOp() { return op_; }
-
- @Override
- public boolean equals(Object obj) {
- if (!super.equals(obj)) return false;
- return ((CompoundPredicate) obj).op_ == op_;
- }
-
- @Override
- public String debugString() {
- return Objects.toStringHelper(this)
- .add("op", op_)
- .addValue(super.debugString())
- .toString();
- }
-
- @Override
- public String toSqlImpl() {
- if (children_.size() == 1) {
- Preconditions.checkState(op_ == Operator.NOT);
- return "NOT " + getChild(0).toSql();
- } else {
- return getChild(0).toSql() + " " + op_.toString() + " " + getChild(1).toSql();
- }
- }
-
- @Override
- protected void toThrift(TExprNode msg) {
- msg.node_type = TExprNodeType.COMPOUND_PRED;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- if (isAnalyzed_) return;
- super.analyze(analyzer);
-
- // Check that children are predicates.
- for (Expr e: children_) {
- if (!e.getType().isBoolean() && !e.getType().isNull()) {
- throw new AnalysisException(String.format("Operand '%s' part of predicate " +
- "'%s' should return type 'BOOLEAN' but returns type '%s'.",
- e.toSql(), toSql(), e.getType().toSql()));
- }
- }
-
- fn_ = getBuiltinFunction(analyzer, op_.toString(), collectChildReturnTypes(),
- CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
- Preconditions.checkState(fn_ != null);
- Preconditions.checkState(fn_.getReturnType().isBoolean());
- castForFunctionCall(false);
- if (hasChildCosts()) evalCost_ = getChildCosts() + COMPOUND_PREDICATE_COST;
-
- if (!getChild(0).hasSelectivity() ||
- (children_.size() == 2 && !getChild(1).hasSelectivity())) {
- // Give up if one of our children has an unknown selectivity.
- selectivity_ = -1;
- return;
- }
-
- switch (op_) {
- case AND:
- selectivity_ = getChild(0).selectivity_ * getChild(1).selectivity_;
- break;
- case OR:
- selectivity_ = getChild(0).selectivity_ + getChild(1).selectivity_
- - getChild(0).selectivity_ * getChild(1).selectivity_;
- break;
- case NOT:
- selectivity_ = 1.0 - getChild(0).selectivity_;
- break;
- }
- selectivity_ = Math.max(0.0, Math.min(1.0, selectivity_));
- }
-
- /**
- * Retrieve the slots bound by BinaryPredicate, InPredicate and
- * CompoundPredicates in the subtree rooted at 'this'.
- */
- public ArrayList<SlotRef> getBoundSlots() {
- ArrayList<SlotRef> slots = Lists.newArrayList();
- for (int i = 0; i < getChildren().size(); ++i) {
- if (getChild(i) instanceof BinaryPredicate ||
- getChild(i) instanceof InPredicate) {
- slots.add(((Predicate)getChild(i)).getBoundSlot());
- } else if (getChild(i) instanceof CompoundPredicate) {
- slots.addAll(((CompoundPredicate)getChild(i)).getBoundSlots());
- }
- }
- return slots;
- }
-
- /**
- * Negates a CompoundPredicate.
- */
- @Override
- public Expr negate() {
- if (op_ == Operator.NOT) return getChild(0);
- Expr negatedLeft = getChild(0).negate();
- Expr negatedRight = getChild(1).negate();
- Operator newOp = (op_ == Operator.OR) ? Operator.AND : Operator.OR;
- return new CompoundPredicate(newOp, negatedLeft, negatedRight);
- }
-
- /**
- * Creates a conjunctive predicate from a list of exprs.
- */
- public static Expr createConjunctivePredicate(List<Expr> conjuncts) {
- Expr conjunctivePred = null;
- for (Expr expr: conjuncts) {
- if (conjunctivePred == null) {
- conjunctivePred = expr;
- continue;
- }
- conjunctivePred = new CompoundPredicate(CompoundPredicate.Operator.AND,
- expr, conjunctivePred);
- }
- return conjunctivePred;
- }
-
- @Override
- public Expr clone() { return new CompoundPredicate(this); }
-
- // Create an AND predicate between two exprs, 'lhs' and 'rhs'. If
- // 'rhs' is null, simply return 'lhs'.
- public static Expr createConjunction(Expr lhs, Expr rhs) {
- if (rhs == null) return lhs;
- return new CompoundPredicate(Operator.AND, rhs, lhs);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java
deleted file mode 100644
index cd01713..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java
+++ /dev/null
@@ -1,553 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.log4j.Logger;
-
-import com.cloudera.impala.authorization.Privilege;
-import com.cloudera.impala.catalog.Column;
-import com.cloudera.impala.catalog.HBaseTable;
-import com.cloudera.impala.catalog.HdfsPartition;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.catalog.View;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.thrift.TComputeStatsParams;
-import com.cloudera.impala.thrift.TPartitionStats;
-import com.cloudera.impala.thrift.TTableName;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Represents a COMPUTE STATS <table> and COMPUTE INCREMENTAL STATS <table> [PARTITION
- * <part_spec>] statement for statistics collection. The former statement gathers all
- * table and column stats for a given table and stores them in the Metastore via the
- * CatalogService. All existing stats for that table are replaced and no existing stats
- * are reused. The latter, incremental form, similarly computes stats for the whole table
- * but does so by re-using stats from partitions which have 'valid' statistics. Statistics
- * are 'valid' currently if they exist, in the future they may be expired based on recency
- * etc.
- *
- * TODO: Allow more coarse/fine grained (db, column)
- * TODO: Compute stats on complex types.
- */
-public class ComputeStatsStmt extends StatementBase {
- private static final Logger LOG = Logger.getLogger(ComputeStatsStmt.class);
-
- private static String AVRO_SCHEMA_MSG_PREFIX = "Cannot COMPUTE STATS on Avro table " +
- "'%s' because its column definitions do not match those in the Avro schema.";
- private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " +
- "column definitions, e.g., using the result of 'SHOW CREATE TABLE'";
-
- protected final TableName tableName_;
-
- // Set during analysis.
- protected Table table_;
-
- // The Null count is not currently being used in optimization or run-time,
- // and compute stats runs 2x faster in many cases when not counting NULLs.
- private static final boolean COUNT_NULLS = false;
-
- // Query for getting the per-partition row count and the total row count.
- // Set during analysis.
- protected String tableStatsQueryStr_;
-
- // Query for getting the per-column NDVs and number of NULLs.
- // Set during analysis.
- protected String columnStatsQueryStr_;
-
- // If true, stats will be gathered incrementally per-partition.
- private boolean isIncremental_ = false;
-
- // If true, expect the compute stats process to produce output for all partitions in the
- // target table (only meaningful, therefore, if partitioned). This is always true for
- // non-incremental computations. If set, expectedPartitions_ will be empty - the point
- // of this flag is to optimise the case where all partitions are targeted.
- private boolean expectAllPartitions_ = false;
-
- // The list of valid partition statistics that can be used in an incremental computation
- // without themselves being recomputed. Populated in analyze().
- private final List<TPartitionStats> validPartStats_ = Lists.newArrayList();
-
- // For incremental computations, the list of partitions (identified by list of partition
- // column values) that we expect to receive results for. Used to ensure that even empty
- // partitions emit results.
- // TODO: Consider using partition IDs (and adding them to the child queries with a
- // PARTITION_ID() builtin)
- private final List<List<String>> expectedPartitions_ = Lists.newArrayList();
-
- // If non-null, the partition that an incremental computation might apply to. Must be
- // null if this is a non-incremental computation.
- private PartitionSpec partitionSpec_ = null;
-
- // The maximum number of partitions that may be explicitly selected by filter
- // predicates. Any query that selects more than this automatically drops back to a full
- // incremental stats recomputation.
- // TODO: We can probably do better than this, e.g. running several queries, each of
- // which selects up to MAX_INCREMENTAL_PARTITIONS partitions.
- private static final int MAX_INCREMENTAL_PARTITIONS = 1000;
-
- /**
- * Constructor for the non-incremental form of COMPUTE STATS.
- */
- protected ComputeStatsStmt(TableName tableName) {
- this(tableName, false, null);
- }
-
- /**
- * Constructor for the incremental form of COMPUTE STATS. If isIncremental is true,
- * statistics will be recomputed incrementally; if false they will be recomputed for the
- * whole table. The partition spec partSpec can specify a single partition whose stats
- * should be recomputed.
- */
- protected ComputeStatsStmt(TableName tableName, boolean isIncremental,
- PartitionSpec partSpec) {
- Preconditions.checkState(tableName != null && !tableName.isEmpty());
- Preconditions.checkState(isIncremental || partSpec == null);
- this.tableName_ = tableName;
- this.table_ = null;
- this.isIncremental_ = isIncremental;
- this.partitionSpec_ = partSpec;
- if (partitionSpec_ != null) {
- partitionSpec_.setTableName(tableName);
- partitionSpec_.setPrivilegeRequirement(Privilege.ALTER);
- }
- }
-
- /**
- * Utility method for constructing the child queries to add partition columns to both a
- * select list and a group-by list; the former are wrapped in a cast to a string.
- */
- private void addPartitionCols(HdfsTable table, List<String> selectList,
- List<String> groupByCols) {
- for (int i = 0; i < table.getNumClusteringCols(); ++i) {
- String colRefSql = ToSqlUtils.getIdentSql(table.getColumns().get(i).getName());
- groupByCols.add(colRefSql);
- // For the select list, wrap the group by columns in a cast to string because
- // the Metastore stores them as strings.
- selectList.add(colRefSql);
- }
- }
-
- private List<String> getBaseColumnStatsQuerySelectList(Analyzer analyzer) {
- List<String> columnStatsSelectList = Lists.newArrayList();
- // For Hdfs tables, exclude partition columns from stats gathering because Hive
- // cannot store them as part of the non-partition column stats. For HBase tables,
- // include the single clustering column (the row key).
- int startColIdx = (table_ instanceof HBaseTable) ? 0 : table_.getNumClusteringCols();
- final String ndvUda = isIncremental_ ? "NDV_NO_FINALIZE" : "NDV";
-
- for (int i = startColIdx; i < table_.getColumns().size(); ++i) {
- Column c = table_.getColumns().get(i);
- Type type = c.getType();
-
- // Ignore columns with an invalid/unsupported type. For example, complex types in
- // an HBase-backed table will appear as invalid types.
- if (!type.isValid() || !type.isSupported()
- || c.getType().isComplexType()) {
- continue;
- }
- // NDV approximation function. Add explicit alias for later identification when
- // updating the Metastore.
- String colRefSql = ToSqlUtils.getIdentSql(c.getName());
- columnStatsSelectList.add(ndvUda + "(" + colRefSql + ") AS " + colRefSql);
-
- if (COUNT_NULLS) {
- // Count the number of NULL values.
- columnStatsSelectList.add("COUNT(IF(" + colRefSql + " IS NULL, 1, NULL))");
- } else {
- // Using -1 to indicate "unknown". We need cast to BIGINT because backend expects
- // an i64Val as the number of NULLs returned by the COMPUTE STATS column stats
- // child query. See CatalogOpExecutor::SetColumnStats(). If we do not cast, then
- // the -1 will be treated as TINYINT resulting a 0 to be placed in the #NULLs
- // column (see IMPALA-1068).
- columnStatsSelectList.add("CAST(-1 as BIGINT)");
- }
-
- // For STRING columns also compute the max and avg string length.
- if (type.isStringType()) {
- columnStatsSelectList.add("MAX(length(" + colRefSql + "))");
- columnStatsSelectList.add("AVG(length(" + colRefSql + "))");
- } else {
- // For non-STRING columns we use the fixed size of the type.
- // We store the same information for all types to avoid having to
- // treat STRING columns specially in the BE CatalogOpExecutor.
- Integer typeSize = type.getPrimitiveType().getSlotSize();
- columnStatsSelectList.add(typeSize.toString());
- columnStatsSelectList.add("CAST(" + typeSize.toString() + " as DOUBLE)");
- }
-
- if (isIncremental_) {
- // Need the count in order to properly combine per-partition column stats
- columnStatsSelectList.add("COUNT(" + colRefSql + ")");
- }
- }
- return columnStatsSelectList;
- }
-
- /**
- * Constructs two queries to compute statistics for 'tableName_', if that table exists
- * (although if we can detect that no work needs to be done for either query, that query
- * will be 'null' and not executed).
- *
- * The first query computes the number of rows (on a per-partition basis if the table is
- * partitioned) and has the form "SELECT COUNT(*) FROM tbl GROUP BY part_col1,
- * part_col2...", with an optional WHERE clause for incremental computation (see below).
- *
- * The second query computes the NDV estimate, the average width, the maximum width and,
- * optionally, the number of nulls for each column. For non-partitioned tables (or
- * non-incremental computations), the query is simple:
- *
- * SELECT NDV(col), COUNT(<nulls>), MAX(length(col)), AVG(length(col)) FROM tbl
- *
- * (For non-string columns, the widths are hard-coded as they are known at query
- * construction time).
- *
- * If computation is incremental (i.e. the original statement was COMPUTE INCREMENTAL
- * STATS.., and the underlying table is a partitioned HdfsTable), some modifications are
- * made to the non-incremental per-column query. First, a different UDA,
- * NDV_NO_FINALIZE() is used to retrieve and serialise the intermediate state from each
- * column. Second, the results are grouped by partition, as with the row count query, so
- * that the intermediate NDV computation state can be stored per-partition. The number
- * of rows per-partition are also recorded.
- *
- * For both the row count query, and the column stats query, the query's WHERE clause is
- * used to restrict execution only to partitions that actually require new statistics to
- * be computed.
- *
- * SELECT NDV_NO_FINALIZE(col), <nulls, max, avg>, COUNT(col) FROM tbl
- * GROUP BY part_col1, part_col2, ...
- * WHERE ((part_col1 = p1_val1) AND (part_col2 = p1_val2)) OR
- * ((part_col1 = p2_val1) AND (part_col2 = p2_val2)) OR ...
- */
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- table_ = analyzer.getTable(tableName_, Privilege.ALTER);
- String sqlTableName = table_.getTableName().toSql();
- if (table_ instanceof View) {
- throw new AnalysisException(String.format(
- "COMPUTE STATS not supported for view %s", sqlTableName));
- }
-
- if (!(table_ instanceof HdfsTable)) {
- if (partitionSpec_ != null) {
- throw new AnalysisException("COMPUTE INCREMENTAL ... PARTITION not supported " +
- "for non-HDFS table " + table_.getTableName());
- }
- isIncremental_ = false;
- }
-
- // Ensure that we write an entry for every partition if this isn't incremental
- if (!isIncremental_) expectAllPartitions_ = true;
-
- HdfsTable hdfsTable = null;
- if (table_ instanceof HdfsTable) {
- hdfsTable = (HdfsTable)table_;
- if (isIncremental_ && hdfsTable.getNumClusteringCols() == 0 &&
- partitionSpec_ != null) {
- throw new AnalysisException(String.format(
- "Can't compute PARTITION stats on an unpartitioned table: %s",
- sqlTableName));
- } else if (partitionSpec_ != null) {
- partitionSpec_.setPartitionShouldExist();
- partitionSpec_.analyze(analyzer);
- for (PartitionKeyValue kv: partitionSpec_.getPartitionSpecKeyValues()) {
- // TODO: We could match the dynamic keys (i.e. as wildcards) as well, but that
- // would involve looping over all partitions and seeing which match the
- // partition spec.
- if (!kv.isStatic()) {
- throw new AnalysisException("All partition keys must have values: " +
- kv.toString());
- }
- }
- }
- // For incremental stats, estimate the size of intermediate stats and report an
- // error if the estimate is greater than MAX_INCREMENTAL_STATS_SIZE_BYTES.
- if (isIncremental_) {
- long statsSizeEstimate = hdfsTable.getColumns().size() *
- hdfsTable.getPartitions().size() * HdfsTable.STATS_SIZE_PER_COLUMN_BYTES;
- if (statsSizeEstimate > HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES) {
- LOG.error("Incremental stats size estimate for table " + hdfsTable.getName() +
- " exceeded " + HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES + ", estimate = "
- + statsSizeEstimate);
- throw new AnalysisException("Incremental stats size estimate exceeds "
- + PrintUtils.printBytes(HdfsTable.MAX_INCREMENTAL_STATS_SIZE_BYTES)
- + ". Please try COMPUTE STATS instead.");
- }
- }
- }
-
- // Build partition filters that only select partitions without valid statistics for
- // incremental computation.
- List<String> filterPreds = Lists.newArrayList();
- if (isIncremental_) {
- if (partitionSpec_ == null) {
- // If any column does not have stats, we recompute statistics for all partitions
- // TODO: need a better way to invalidate stats for all partitions, so that we can
- // use this logic to only recompute new / changed columns.
- boolean tableIsMissingColStats = false;
-
- // We'll warn the user if a column is missing stats (and therefore we rescan the
- // whole table), but if all columns are missing stats, the table just doesn't have
- // any stats and there's no need to warn.
- boolean allColumnsMissingStats = true;
- String exampleColumnMissingStats = null;
- // Partition columns always have stats, so exclude them from this search
- for (Column col: table_.getNonClusteringColumns()) {
- if (!col.getStats().hasStats()) {
- if (!tableIsMissingColStats) {
- tableIsMissingColStats = true;
- exampleColumnMissingStats = col.getName();
- }
- } else {
- allColumnsMissingStats = false;
- }
- }
-
- if (tableIsMissingColStats && !allColumnsMissingStats) {
- analyzer.addWarning("Column " + exampleColumnMissingStats +
- " does not have statistics, recomputing stats for the whole table");
- }
-
- for (HdfsPartition p: hdfsTable.getPartitions()) {
- if (p.isDefaultPartition()) continue;
- TPartitionStats partStats = p.getPartitionStats();
- if (!p.hasIncrementalStats() || tableIsMissingColStats) {
- if (partStats == null) LOG.trace(p.toString() + " does not have stats");
- if (!tableIsMissingColStats) filterPreds.add(p.getConjunctSql());
- List<String> partValues = Lists.newArrayList();
- for (LiteralExpr partValue: p.getPartitionValues()) {
- partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue,
- "NULL"));
- }
- expectedPartitions_.add(partValues);
- } else {
- LOG.trace(p.toString() + " does have statistics");
- validPartStats_.add(partStats);
- }
- }
- if (expectedPartitions_.size() == hdfsTable.getPartitions().size() - 1) {
- expectedPartitions_.clear();
- expectAllPartitions_ = true;
- }
- } else {
- // Always compute stats on a particular partition when told to.
- List<String> partitionConjuncts = Lists.newArrayList();
- for (PartitionKeyValue kv: partitionSpec_.getPartitionSpecKeyValues()) {
- partitionConjuncts.add(kv.toPredicateSql());
- }
- filterPreds.add("(" + Joiner.on(" AND ").join(partitionConjuncts) + ")");
- HdfsPartition targetPartition =
- hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues());
- List<String> partValues = Lists.newArrayList();
- for (LiteralExpr partValue: targetPartition.getPartitionValues()) {
- partValues.add(PartitionKeyValue.getPartitionKeyValueString(partValue,
- "NULL"));
- }
- expectedPartitions_.add(partValues);
- for (HdfsPartition p: hdfsTable.getPartitions()) {
- if (p.isDefaultPartition()) continue;
- if (p == targetPartition) continue;
- TPartitionStats partStats = p.getPartitionStats();
- if (partStats != null) validPartStats_.add(partStats);
- }
- }
-
- if (filterPreds.size() == 0 && validPartStats_.size() != 0) {
- LOG.info("No partitions selected for incremental stats update");
- analyzer.addWarning("No partitions selected for incremental stats update");
- return;
- }
- }
-
- if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) {
- // TODO: Consider simply running for MAX_INCREMENTAL_PARTITIONS partitions, and then
- // advising the user to iterate.
- analyzer.addWarning(
- "Too many partitions selected, doing full recomputation of incremental stats");
- filterPreds.clear();
- validPartStats_.clear();
- }
-
- List<String> groupByCols = Lists.newArrayList();
- List<String> partitionColsSelectList = Lists.newArrayList();
- // Only add group by clause for HdfsTables.
- if (hdfsTable != null) {
- if (hdfsTable.isAvroTable()) checkIncompleteAvroSchema(hdfsTable);
- addPartitionCols(hdfsTable, partitionColsSelectList, groupByCols);
- }
-
- // Query for getting the per-partition row count and the total row count.
- StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT ");
- List<String> tableStatsSelectList = Lists.newArrayList();
- tableStatsSelectList.add("COUNT(*)");
-
- tableStatsSelectList.addAll(partitionColsSelectList);
- tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList));
- tableStatsQueryBuilder.append(" FROM " + sqlTableName);
-
- // Query for getting the per-column NDVs and number of NULLs.
- List<String> columnStatsSelectList = getBaseColumnStatsQuerySelectList(analyzer);
-
- if (isIncremental_) columnStatsSelectList.addAll(partitionColsSelectList);
-
- StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT ");
- columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList));
- columnStatsQueryBuilder.append(" FROM " + sqlTableName);
-
- // Add the WHERE clause to filter out partitions that we don't want to compute
- // incremental stats for. While this is a win in most situations, we would like to
- // avoid this where it does no useful work (i.e. it selects all rows). This happens
- // when there are no existing valid partitions (so all partitions will have been
- // selected in) and there is no partition spec (so no single partition was explicitly
- // selected in).
- if (filterPreds.size() > 0 &&
- (validPartStats_.size() > 0 || partitionSpec_ != null)) {
- String filterClause = " WHERE " + Joiner.on(" OR ").join(filterPreds);
- columnStatsQueryBuilder.append(filterClause);
- tableStatsQueryBuilder.append(filterClause);
- }
-
- if (groupByCols.size() > 0) {
- String groupBy = " GROUP BY " + Joiner.on(", ").join(groupByCols);
- if (isIncremental_) columnStatsQueryBuilder.append(groupBy);
- tableStatsQueryBuilder.append(groupBy);
- }
-
- tableStatsQueryStr_ = tableStatsQueryBuilder.toString();
- LOG.debug("Table stats query: " + tableStatsQueryStr_);
-
- if (columnStatsSelectList.isEmpty()) {
- // Table doesn't have any columns that we can compute stats for.
- LOG.info("No supported column types in table " + table_.getTableName() +
- ", no column statistics will be gathered.");
- columnStatsQueryStr_ = null;
- return;
- }
-
- columnStatsQueryStr_ = columnStatsQueryBuilder.toString();
- LOG.debug("Column stats query: " + columnStatsQueryStr_);
- }
-
- /**
- * Checks whether the column definitions from the CREATE TABLE stmt match the columns
- * in the Avro schema. If there is a mismatch, then COMPUTE STATS cannot update the
- * statistics in the Metastore's backend DB due to HIVE-6308. Throws an
- * AnalysisException for such ill-created Avro tables. Does nothing if
- * the column definitions match the Avro schema exactly.
- */
- private void checkIncompleteAvroSchema(HdfsTable table) throws AnalysisException {
- Preconditions.checkState(table.isAvroTable());
- org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
- // The column definitions from 'CREATE TABLE (column definitions) ...'
- Iterator<FieldSchema> colDefs = msTable.getSd().getCols().iterator();
- // The columns derived from the Avro schema file or literal schema.
- // Inconsistencies between the Avro-schema columns and the column definitions
- // are sometimes resolved in the CREATE TABLE, and sometimes not (see below).
- Iterator<Column> avroSchemaCols = table.getColumns().iterator();
- // Skip partition columns from 'table' since those are not present in
- // the msTable field schemas.
- for (int i = 0; i < table.getNumClusteringCols(); ++i) {
- if (avroSchemaCols.hasNext()) avroSchemaCols.next();
- }
- int pos = 0;
- while (colDefs.hasNext() || avroSchemaCols.hasNext()) {
- if (colDefs.hasNext() && avroSchemaCols.hasNext()) {
- FieldSchema colDef = colDefs.next();
- Column avroSchemaCol = avroSchemaCols.next();
- // Check that the column names are identical. Ignore mismatched types
- // as those will either fail in the scan or succeed.
- if (!colDef.getName().equalsIgnoreCase(avroSchemaCol.getName())) {
- throw new AnalysisException(
- String.format(AVRO_SCHEMA_MSG_PREFIX +
- "\nDefinition of column '%s' of type '%s' does not match " +
- "the Avro-schema column '%s' of type '%s' at position '%s'.\n" +
- AVRO_SCHEMA_MSG_SUFFIX,
- table.getName(), colDef.getName(), colDef.getType(),
- avroSchemaCol.getName(), avroSchemaCol.getType(), pos));
- }
- }
- // The following two cases are typically not possible because Hive resolves
- // inconsistencies between the column-definition list and the Avro schema if a
- // column-definition list was given in the CREATE TABLE (having no column
- // definitions at all results in HIVE-6308). Even so, we check these cases for
- // extra safety. COMPUTE STATS could be made to succeed in special instances of
- // the cases below but we chose to throw an AnalysisException to avoid confusion
- // because this scenario "should" never arise as mentioned above.
- if (colDefs.hasNext() && !avroSchemaCols.hasNext()) {
- FieldSchema colDef = colDefs.next();
- throw new AnalysisException(
- String.format(AVRO_SCHEMA_MSG_PREFIX +
- "\nMissing Avro-schema column corresponding to column " +
- "definition '%s' of type '%s' at position '%s'.\n" +
- AVRO_SCHEMA_MSG_SUFFIX,
- table.getName(), colDef.getName(), colDef.getType(), pos));
- }
- if (!colDefs.hasNext() && avroSchemaCols.hasNext()) {
- Column avroSchemaCol = avroSchemaCols.next();
- throw new AnalysisException(
- String.format(AVRO_SCHEMA_MSG_PREFIX +
- "\nMissing column definition corresponding to Avro-schema " +
- "column '%s' of type '%s' at position '%s'.\n" +
- AVRO_SCHEMA_MSG_SUFFIX,
- table.getName(), avroSchemaCol.getName(), avroSchemaCol.getType(), pos));
- }
- ++pos;
- }
- }
-
- public String getTblStatsQuery() { return tableStatsQueryStr_; }
- public String getColStatsQuery() { return columnStatsQueryStr_; }
-
- @Override
- public String toSql() {
- if (!isIncremental_) {
- return "COMPUTE STATS " + tableName_.toSql();
- } else {
- return "COMPUTE INCREMENTAL STATS " + tableName_.toSql() +
- partitionSpec_ == null ? "" : partitionSpec_.toSql();
- }
- }
-
- public TComputeStatsParams toThrift() {
- TComputeStatsParams params = new TComputeStatsParams();
- params.setTable_name(new TTableName(table_.getDb().getName(), table_.getName()));
- params.setTbl_stats_query(tableStatsQueryStr_);
- if (columnStatsQueryStr_ != null) {
- params.setCol_stats_query(columnStatsQueryStr_);
- } else {
- params.setCol_stats_queryIsSet(false);
- }
-
- params.setIs_incremental(isIncremental_);
- params.setExisting_part_stats(validPartStats_);
- params.setExpect_all_partitions(expectAllPartitions_);
- if (!expectAllPartitions_) params.setExpected_partitions(expectedPartitions_);
- if (isIncremental_) {
- params.setNum_partition_cols(((HdfsTable)table_).getNumClusteringCols());
- }
- return params;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java
deleted file mode 100644
index 1ee6fd4..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/CreateDataSrcStmt.java
+++ /dev/null
@@ -1,97 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-
-import com.cloudera.impala.authorization.Privilege;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.extdatasource.ApiVersion;
-import com.cloudera.impala.thrift.TCreateDataSourceParams;
-import com.cloudera.impala.thrift.TDataSource;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-/**
- * Represents a CREATE DATA SOURCE statement.
- */
-public class CreateDataSrcStmt extends StatementBase {
- private final String dataSrcName_;
- private final String className_;
- private final String apiVersionString_;
- private final HdfsUri location_;
- private final boolean ifNotExists_;
- private ApiVersion apiVersion_;
-
- public CreateDataSrcStmt(String dataSrcName, HdfsUri location, String className,
- String apiVersionString, boolean ifNotExists) {
- Preconditions.checkNotNull(dataSrcName);
- Preconditions.checkNotNull(className);
- Preconditions.checkNotNull(apiVersionString);
- Preconditions.checkNotNull(location);
- dataSrcName_ = dataSrcName.toLowerCase();
- location_ = location;
- className_ = className;
- apiVersionString_ = apiVersionString;
- ifNotExists_ = ifNotExists;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- if (!MetaStoreUtils.validateName(dataSrcName_)) {
- throw new AnalysisException("Invalid data source name: " + dataSrcName_);
- }
- if (!ifNotExists_ && analyzer.getCatalog().getDataSource(dataSrcName_) != null) {
- throw new AnalysisException(Analyzer.DATA_SRC_ALREADY_EXISTS_ERROR_MSG +
- dataSrcName_);
- }
-
- apiVersion_ = ApiVersion.parseApiVersion(apiVersionString_);
- if (apiVersion_ == null) {
- throw new AnalysisException("Invalid API version: '" + apiVersionString_ +
- "'. Valid API versions: " + Joiner.on(", ").join(ApiVersion.values()));
- }
-
- location_.analyze(analyzer, Privilege.ALL, FsAction.READ);
- // TODO: Check class exists and implements API version
- // TODO: authorization check
- }
-
- @Override
- public String toSql() {
- StringBuilder sb = new StringBuilder();
- sb.append("CREATE DATA SOURCE ");
- if (ifNotExists_) sb.append("IF NOT EXISTS ");
- sb.append(dataSrcName_);
- sb.append(" LOCATION '");
- sb.append(location_.getLocation());
- sb.append("' CLASS '");
- sb.append(className_);
- sb.append("' API_VERSION '");
- sb.append(apiVersion_.name());
- sb.append("'");
- return sb.toString();
- }
-
- public TCreateDataSourceParams toThrift() {
- return new TCreateDataSourceParams(
- new TDataSource(dataSrcName_, location_.toString(), className_,
- apiVersion_.name())).setIf_not_exists(ifNotExists_);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java
deleted file mode 100644
index 3dedd8b..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/CreateDbStmt.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-
-import com.cloudera.impala.authorization.Privilege;
-import com.cloudera.impala.catalog.Db;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.thrift.TCreateDbParams;
-
-/**
- * Represents a CREATE DATABASE statement
- */
-public class CreateDbStmt extends StatementBase {
- private final String dbName_;
- private final HdfsUri location_;
- private final String comment_;
- private final boolean ifNotExists_;
-
- /**
- * Creates a database with the given name.
- */
- public CreateDbStmt(String dbName) {
- this(dbName, null, null, false);
- }
-
- /**
- * Creates a database with the given name, comment, and HDFS table storage location.
- * New tables created in the database inherit the location property for their default
- * storage location. Create database will throw an error if the database already exists
- * unless the ifNotExists is true.
- */
- public CreateDbStmt(String dbName, String comment, HdfsUri location,
- boolean ifNotExists) {
- this.dbName_ = dbName;
- this.comment_ = comment;
- this.location_ = location;
- this.ifNotExists_ = ifNotExists;
- }
-
- public String getComment() { return comment_; }
- public String getDb() { return dbName_; }
- public boolean getIfNotExists() { return ifNotExists_; }
- public HdfsUri getLocation() { return location_; }
-
- @Override
- public String toSql() {
- StringBuilder sb = new StringBuilder("CREATE DATABASE");
- if (ifNotExists_) sb.append(" IF NOT EXISTS");
- sb.append(dbName_);
- if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'");
- if (location_ != null) sb.append(" LOCATION '" + location_ + "'");
- return sb.toString();
- }
-
- public TCreateDbParams toThrift() {
- TCreateDbParams params = new TCreateDbParams();
- params.setDb(getDb());
- params.setComment(getComment());
- params.setLocation(location_ == null ? null : location_.toString());
- params.setIf_not_exists(getIfNotExists());
- return params;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- // Check whether the db name meets the Metastore's requirements.
- if (!MetaStoreUtils.validateName(dbName_)) {
- throw new AnalysisException("Invalid database name: " + dbName_);
- }
-
- // Note: It is possible that a database with the same name was created external to
- // this Impala instance. If that happens, the caller will not get an
- // AnalysisException when creating the database, they will get a Hive
- // AlreadyExistsException once the request has been sent to the metastore.
- Db db = analyzer.getDb(getDb(), Privilege.CREATE, false);
- if (db != null && !ifNotExists_) {
- throw new AnalysisException(Analyzer.DB_ALREADY_EXISTS_ERROR_MSG + getDb());
- }
-
- if (location_ != null) {
- location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java
deleted file mode 100644
index ef90b8a..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/CreateDropRoleStmt.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import com.cloudera.impala.catalog.Role;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.thrift.TCreateDropRoleParams;
-import com.google.common.base.Preconditions;
-
-/**
- * Represents a "CREATE ROLE" or "DROP ROLE" statement.
- */
-public class CreateDropRoleStmt extends AuthorizationStmt {
- private final String roleName_;
- private final boolean isDropRole_;
-
- // Set in analysis
- private String user_;
-
- public CreateDropRoleStmt(String roleName, boolean isDropRole) {
- Preconditions.checkNotNull(roleName);
- roleName_ = roleName;
- isDropRole_ = isDropRole;
- }
-
- @Override
- public String toSql() {
- return String.format("%s ROLE %s", roleName_, isDropRole_ ? "DROP" : "CREATE");
- }
-
- public TCreateDropRoleParams toThrift() {
- TCreateDropRoleParams params = new TCreateDropRoleParams();
- params.setRole_name(roleName_);
- params.setIs_drop(isDropRole_);
- return params;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- super.analyze(analyzer);
- Role existingRole = analyzer.getCatalog().getAuthPolicy().getRole(roleName_);
- if (isDropRole_ && existingRole == null) {
- throw new AnalysisException(String.format("Role '%s' does not exist.", roleName_));
- } else if (!isDropRole_ && existingRole != null) {
- throw new AnalysisException(String.format("Role '%s' already exists.", roleName_));
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java
deleted file mode 100644
index ebfd7b6..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java
+++ /dev/null
@@ -1,206 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.hadoop.fs.permission.FsAction;
-
-import com.cloudera.impala.authorization.AuthorizeableFn;
-import com.cloudera.impala.authorization.Privilege;
-import com.cloudera.impala.authorization.PrivilegeRequest;
-import com.cloudera.impala.catalog.Catalog;
-import com.cloudera.impala.catalog.Db;
-import com.cloudera.impala.catalog.Function;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.thrift.TCreateFunctionParams;
-import com.cloudera.impala.thrift.TFunctionBinaryType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Base class for CREATE [] FUNCTION.
- */
-public abstract class CreateFunctionStmtBase extends StatementBase {
-
- // Enums for valid keys for optional arguments.
- public enum OptArg {
- COMMENT,
- SYMBOL, // Only used for Udfs
- PREPARE_FN, // Only used for Udfs
- CLOSE_FN, // Only used for Udfs
- UPDATE_FN, // Only used for Udas
- INIT_FN, // Only used for Udas
- SERIALIZE_FN, // Only used for Udas
- MERGE_FN, // Only used for Udas
- FINALIZE_FN // Only used for Udas
- };
-
- protected final FunctionName fnName_;
- protected final FunctionArgs args_;
- protected final TypeDef retTypeDef_;
- protected final HdfsUri location_;
- protected final HashMap<CreateFunctionStmtBase.OptArg, String> optArgs_;
- protected final boolean ifNotExists_;
-
- // Result of analysis.
- protected Function fn_;
-
- // Db object for function fn_. Set in analyze().
- protected Db db_;
-
- // Set in analyze()
- protected String sqlString_;
-
- protected CreateFunctionStmtBase(FunctionName fnName, FunctionArgs args,
- TypeDef retTypeDef, HdfsUri location, boolean ifNotExists,
- HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) {
- // The return and arg types must either be both null or non-null.
- Preconditions.checkState(!(args == null ^ retTypeDef == null));
- fnName_ = fnName;
- args_ = args;
- retTypeDef_ = retTypeDef;
- location_ = location;
- ifNotExists_ = ifNotExists;
- optArgs_ = optArgs;
- }
-
- public String getComment() { return optArgs_.get(OptArg.COMMENT); }
- public boolean getIfNotExists() { return ifNotExists_; }
- public boolean hasSignature() { return args_ != null; }
-
- public TCreateFunctionParams toThrift() {
- TCreateFunctionParams params = new TCreateFunctionParams(fn_.toThrift());
- params.setIf_not_exists(getIfNotExists());
- params.setFn(fn_.toThrift());
- return params;
- }
-
- // Returns optArg[key], first validating that it is set.
- protected String checkAndGetOptArg(OptArg key)
- throws AnalysisException {
- if (!optArgs_.containsKey(key)) {
- throw new AnalysisException("Argument '" + key + "' must be set.");
- }
- return optArgs_.get(key);
- }
-
- protected void checkOptArgNotSet(OptArg key)
- throws AnalysisException {
- if (optArgs_.containsKey(key)) {
- throw new AnalysisException("Optional argument '" + key + "' should not be set.");
- }
- }
-
- // Returns the function's binary type based on the path extension.
- private TFunctionBinaryType getBinaryType() throws AnalysisException {
- TFunctionBinaryType binaryType = null;
- String binaryPath = fn_.getLocation().getLocation();
- int suffixIndex = binaryPath.lastIndexOf(".");
- if (suffixIndex != -1) {
- String suffix = binaryPath.substring(suffixIndex + 1);
- if (suffix.equalsIgnoreCase("jar")) {
- binaryType = TFunctionBinaryType.JAVA;
- } else if (suffix.equalsIgnoreCase("so")) {
- binaryType = TFunctionBinaryType.NATIVE;
- } else if (suffix.equalsIgnoreCase("ll")) {
- binaryType = TFunctionBinaryType.IR;
- }
- }
- if (binaryType == null) {
- throw new AnalysisException("Unknown binary type: '" + binaryPath +
- "'. Binary must end in .jar, .so or .ll");
- }
- return binaryType;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- // Validate function name is legal
- fnName_.analyze(analyzer);
-
- if (hasSignature()) {
- // Validate function arguments and return type.
- args_.analyze(analyzer);
- retTypeDef_.analyze(analyzer);
- fn_ = createFunction(fnName_, args_.getArgTypes(), retTypeDef_.getType(),
- args_.hasVarArgs());
- } else {
- fn_ = createFunction(fnName_, null, null, false);
- }
-
- // For now, if authorization is enabled, the user needs ALL on the server
- // to create functions.
- // TODO: this is not the right granularity but acceptable for now.
- analyzer.registerPrivReq(new PrivilegeRequest(
- new AuthorizeableFn(fn_.signatureString()), Privilege.ALL));
-
- Db builtinsDb = analyzer.getCatalog().getDb(Catalog.BUILTINS_DB);
- if (builtinsDb.containsFunction(fn_.getName())) {
- throw new AnalysisException("Function cannot have the same name as a builtin: " +
- fn_.getFunctionName().getFunction());
- }
-
- db_ = analyzer.getDb(fn_.dbName(), Privilege.CREATE);
- Function existingFn = db_.getFunction(fn_, Function.CompareMode.IS_INDISTINGUISHABLE);
- if (existingFn != null && !ifNotExists_) {
- throw new AnalysisException(Analyzer.FN_ALREADY_EXISTS_ERROR_MSG +
- existingFn.signatureString());
- }
-
- location_.analyze(analyzer, Privilege.CREATE, FsAction.READ);
- fn_.setLocation(location_);
-
- // Check the file type from the binary type to infer the type of the UDA
- fn_.setBinaryType(getBinaryType());
-
- // Forbid unsupported and complex types.
- if (hasSignature()) {
- List<Type> refdTypes = Lists.newArrayList(fn_.getReturnType());
- refdTypes.addAll(Lists.newArrayList(fn_.getArgs()));
- for (Type t: refdTypes) {
- if (!t.isSupported() || t.isComplexType()) {
- throw new AnalysisException(
- String.format("Type '%s' is not supported in UDFs/UDAs.", t.toSql()));
- }
- }
- } else if (fn_.getBinaryType() != TFunctionBinaryType.JAVA) {
- throw new AnalysisException(
- String.format("Native functions require a return type and/or " +
- "argument types: %s", fn_.getFunctionName()));
- }
-
- // Check if the function can be persisted. We persist all native/IR functions
- // and also JAVA functions added without signature. Only JAVA functions added
- // with signatures aren't persisted.
- if (getBinaryType() == TFunctionBinaryType.JAVA && hasSignature()) {
- fn_.setIsPersistent(false);
- } else {
- fn_.setIsPersistent(true);
- }
- }
-
- /**
- * Creates a concrete function.
- */
- protected abstract Function createFunction(FunctionName fnName,
- ArrayList<Type> argTypes, Type retType, boolean hasVarArgs);
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java
deleted file mode 100644
index cc04b04..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/CreateOrAlterViewStmtBase.java
+++ /dev/null
@@ -1,209 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.thrift.TCreateOrAlterViewParams;
-import com.cloudera.impala.thrift.TTableName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Base class for CREATE VIEW and ALTER VIEW AS SELECT statements.
- */
-public abstract class CreateOrAlterViewStmtBase extends StatementBase {
- private final static Logger LOG =
- LoggerFactory.getLogger(CreateOrAlterViewStmtBase.class);
-
- protected final boolean ifNotExists_;
- protected final TableName tableName_;
- protected final ArrayList<ColumnDef> columnDefs_;
- protected final String comment_;
- protected final QueryStmt viewDefStmt_;
-
- // Set during analysis
- protected String dbName_;
- protected String owner_;
-
- // The original SQL-string given as view definition. Set during analysis.
- // Corresponds to Hive's viewOriginalText.
- protected String originalViewDef_;
-
- // Query statement (as SQL string) that defines the View for view substitution.
- // It is a transformation of the original view definition, e.g., to enforce the
- // columnDefs even if the original view definition has explicit column aliases.
- // If column definitions were given, then this "expanded" view definition
- // wraps the original view definition in a select stmt as follows.
- //
- // SELECT viewName.origCol1 AS colDesc1, viewName.origCol2 AS colDesc2, ...
- // FROM (originalViewDef) AS viewName
- //
- // Corresponds to Hive's viewExpandedText, but is not identical to the SQL
- // Hive would produce in view creation.
- protected String inlineViewDef_;
-
- // Columns to use in the select list of the expanded SQL string and when registering
- // this view in the metastore. Set in analysis.
- protected ArrayList<ColumnDef> finalColDefs_;
-
- public CreateOrAlterViewStmtBase(boolean ifNotExists, TableName tableName,
- ArrayList<ColumnDef> columnDefs, String comment, QueryStmt viewDefStmt) {
- Preconditions.checkNotNull(tableName);
- Preconditions.checkNotNull(viewDefStmt);
- this.ifNotExists_ = ifNotExists;
- this.tableName_ = tableName;
- this.columnDefs_ = columnDefs;
- this.comment_ = comment;
- this.viewDefStmt_ = viewDefStmt;
- }
-
- /**
- * Sets the originalViewDef and the expanded inlineViewDef based on viewDefStmt.
- * If columnDefs were given, checks that they do not contain duplicate column names
- * and throws an exception if they do.
- */
- protected void createColumnAndViewDefs(Analyzer analyzer) throws AnalysisException {
- Preconditions.checkNotNull(dbName_);
- Preconditions.checkNotNull(owner_);
-
- // Set the finalColDefs to reflect the given column definitions.
- if (columnDefs_ != null) {
- Preconditions.checkState(!columnDefs_.isEmpty());
- if (columnDefs_.size() != viewDefStmt_.getColLabels().size()) {
- String cmp =
- (columnDefs_.size() > viewDefStmt_.getColLabels().size()) ? "more" : "fewer";
- throw new AnalysisException(String.format("Column-definition list has " +
- "%s columns (%s) than the view-definition query statement returns (%s).",
- cmp, columnDefs_.size(), viewDefStmt_.getColLabels().size()));
- }
-
- finalColDefs_ = columnDefs_;
- Preconditions.checkState(
- columnDefs_.size() == viewDefStmt_.getBaseTblResultExprs().size());
- for (int i = 0; i < columnDefs_.size(); ++i) {
- // Set type in the column definition from the view-definition statement.
- columnDefs_.get(i).setType(viewDefStmt_.getBaseTblResultExprs().get(i).getType());
- }
- } else {
- // Create list of column definitions from the view-definition statement.
- finalColDefs_ = Lists.newArrayList();
- List<Expr> exprs = viewDefStmt_.getBaseTblResultExprs();
- List<String> labels = viewDefStmt_.getColLabels();
- Preconditions.checkState(exprs.size() == labels.size());
- for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) {
- ColumnDef colDef = new ColumnDef(labels.get(i), null, null);
- colDef.setType(exprs.get(i).getType());
- finalColDefs_.add(colDef);
- }
- }
-
- // Check that the column definitions have valid names, and that there are no
- // duplicate column names.
- Set<String> distinctColNames = Sets.newHashSet();
- for (ColumnDef colDesc: finalColDefs_) {
- colDesc.analyze();
- if (!distinctColNames.add(colDesc.getColName().toLowerCase())) {
- throw new AnalysisException("Duplicate column name: " + colDesc.getColName());
- }
- }
-
- // Set original and expanded view-definition SQL strings.
- originalViewDef_ = viewDefStmt_.toSql();
-
- // If no column definitions were given, then the expanded view SQL is the same
- // as the original one.
- if (columnDefs_ == null) {
- inlineViewDef_ = originalViewDef_;
- return;
- }
-
- // Wrap the original view-definition statement into a SELECT to enforce the
- // given column definitions.
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT ");
- for (int i = 0; i < finalColDefs_.size(); ++i) {
- String colRef = ToSqlUtils.getIdentSql(viewDefStmt_.getColLabels().get(i));
- String colAlias = ToSqlUtils.getIdentSql(finalColDefs_.get(i).getColName());
- sb.append(String.format("%s.%s AS %s", tableName_.getTbl(), colRef, colAlias));
- sb.append((i+1 != finalColDefs_.size()) ? ", " : "");
- }
- // Do not use 'AS' for table aliases because Hive only accepts them without 'AS'.
- sb.append(String.format(" FROM (%s) %s", originalViewDef_, tableName_.getTbl()));
- inlineViewDef_ = sb.toString();
- }
-
- /**
- * Computes the column lineage graph for a create/alter view statetement.
- */
- protected void computeLineageGraph(Analyzer analyzer) {
- ColumnLineageGraph graph = analyzer.getColumnLineageGraph();
- List<String> colDefs = Lists.newArrayList();
- for (ColumnDef colDef: finalColDefs_) {
- colDefs.add(dbName_ + "." + getTbl() + "." + colDef.getColName());
- }
- graph.addTargetColumnLabels(colDefs);
- graph.computeLineageGraph(viewDefStmt_.getResultExprs(), analyzer);
- LOG.trace("lineage: " + graph.debugString());
- }
-
- public TCreateOrAlterViewParams toThrift() {
- TCreateOrAlterViewParams params = new TCreateOrAlterViewParams();
- params.setView_name(new TTableName(getDb(), getTbl()));
- for (ColumnDef col: finalColDefs_) {
- params.addToColumns(col.toThrift());
- }
- params.setOwner(getOwner());
- params.setIf_not_exists(getIfNotExists());
- params.setOriginal_view_def(originalViewDef_);
- params.setExpanded_view_def(inlineViewDef_);
- if (comment_ != null) params.setComment(comment_);
- return params;
- }
-
- /**
- * Can only be called after analysis, returns the name of the database the table will
- * be created within.
- */
- public String getDb() {
- Preconditions.checkNotNull(dbName_);
- return dbName_;
- }
-
- /**
- * Can only be called after analysis, returns the owner of the view to be created.
- */
- public String getOwner() {
- Preconditions.checkNotNull(owner_);
- return owner_;
- }
-
- public List<ColumnDef> getColumnDescs() {return columnDefs_; }
- public String getComment() { return comment_; }
- public boolean getIfNotExists() { return ifNotExists_; }
- public String getOriginalViewDef() { return originalViewDef_; }
- public String getInlineViewDef() { return inlineViewDef_; }
- public String getTbl() { return tableName_.getTbl(); }
-}