You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:21 UTC
[04/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
new file mode 100644
index 0000000..7b59625
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.List;
+import java.util.EnumSet;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.catalog.HdfsTable;
+import com.cloudera.impala.catalog.KuduTable;
+import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import com.cloudera.impala.catalog.Table;
+import com.cloudera.impala.catalog.TableId;
+import com.cloudera.impala.catalog.TableLoadingException;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.service.CatalogOpExecutor;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents a CREATE TABLE AS SELECT (CTAS) statement
+ *
+ * The statement supports an optional PARTITIONED BY clause. Its syntax and semantics
+ * follow the PARTITION feature of INSERT FROM SELECT statements: inside the PARTITIONED
+ * BY (...) column list the user must specify names of the columns to partition by. These
+ * column names must appear in the specified order at the end of the select statement. A
+ * remapping between columns of the source and destination tables is not possible, because
+ * the destination table does not yet exist. Specifying static values for the partition
+ * columns is also not possible, as their type needs to be deduced from columns in the
+ * select statement.
+ */
+public class CreateTableAsSelectStmt extends StatementBase {
+ private final CreateTableStmt createStmt_;
+
+ // List of partition columns from the PARTITIONED BY (...) clause. Set to null if no
+ // partition was given.
+ private final List<String> partitionKeys_;
+
+ /////////////////////////////////////////
+ // BEGIN: Members that need to be reset()
+
+ private final InsertStmt insertStmt_;
+
+ // END: Members that need to be reset()
+ /////////////////////////////////////////
+
+ private final static EnumSet<THdfsFileFormat> SUPPORTED_INSERT_FORMATS =
+ EnumSet.of(THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT);
+
+ /**
+ * Builds a CREATE TABLE AS SELECT statement
+ */
+ public CreateTableAsSelectStmt(CreateTableStmt createStmt, QueryStmt queryStmt,
+ List<String> partitionKeys) {
+ Preconditions.checkNotNull(queryStmt);
+ Preconditions.checkNotNull(createStmt);
+ createStmt_ = createStmt;
+ partitionKeys_ = partitionKeys;
+ List<PartitionKeyValue> pkvs = null;
+ if (partitionKeys != null) {
+ pkvs = Lists.newArrayList();
+ for (String key: partitionKeys) {
+ pkvs.add(new PartitionKeyValue(key, null));
+ }
+ }
+ insertStmt_ = new InsertStmt(null, createStmt.getTblName(), false, pkvs,
+ null, queryStmt, null, false);
+ }
+
+ public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); }
+ public InsertStmt getInsertStmt() { return insertStmt_; }
+ public CreateTableStmt getCreateStmt() { return createStmt_; }
+ @Override
+ public String toSql() { return ToSqlUtils.getCreateTableSql(this); }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ if (isAnalyzed()) return;
+ super.analyze(analyzer);
+
+ // The analysis for CTAS happens in two phases - the first phase happens before
+ // the target table exists and we want to validate the CREATE statement and the
+ // query portion of the insert statement. If this passes, analysis will be run
+ // over the full INSERT statement. To avoid duplicate registrations of table/colRefs,
+ // create a new root analyzer and clone the query statement for this initial pass.
+ Analyzer dummyRootAnalyzer = new Analyzer(analyzer.getCatalog(),
+ analyzer.getQueryCtx(), analyzer.getAuthzConfig());
+ QueryStmt tmpQueryStmt = insertStmt_.getQueryStmt().clone();
+ try {
+ Analyzer tmpAnalyzer = new Analyzer(dummyRootAnalyzer);
+ tmpAnalyzer.setUseHiveColLabels(true);
+ tmpQueryStmt.analyze(tmpAnalyzer);
+ // Subqueries need to be rewritten by the StmtRewriter first.
+ if (analyzer.containsSubquery()) return;
+ } finally {
+ // Record missing tables in the original analyzer.
+ analyzer.getMissingTbls().addAll(dummyRootAnalyzer.getMissingTbls());
+ }
+
+ // Add the columns from the partition clause to the create statement.
+ if (partitionKeys_ != null) {
+ int colCnt = tmpQueryStmt.getColLabels().size();
+ int partColCnt = partitionKeys_.size();
+ if (partColCnt >= colCnt) {
+ throw new AnalysisException(String.format("Number of partition columns (%s) " +
+ "must be smaller than the number of columns in the select statement (%s).",
+ partColCnt, colCnt));
+ }
+ int firstCol = colCnt - partColCnt;
+ for (int i = firstCol, j = 0; i < colCnt; ++i, ++j) {
+ String partitionLabel = partitionKeys_.get(j);
+ String colLabel = tmpQueryStmt.getColLabels().get(i);
+
+ // Ensure that partition columns are named and positioned at end of
+ // input column list.
+ if (!partitionLabel.equals(colLabel)) {
+ throw new AnalysisException(String.format("Partition column name " +
+ "mismatch: %s != %s", partitionLabel, colLabel));
+ }
+
+ ColumnDef colDef = new ColumnDef(colLabel, null, null);
+ colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
+ createStmt_.getPartitionColumnDefs().add(colDef);
+ }
+ // Remove partition columns from table column list.
+ tmpQueryStmt.getColLabels().subList(firstCol, colCnt).clear();
+ }
+
+ // Add the columns from the select statement to the create statement.
+ int colCnt = tmpQueryStmt.getColLabels().size();
+ createStmt_.getColumnDefs().clear();
+ for (int i = 0; i < colCnt; ++i) {
+ ColumnDef colDef = new ColumnDef(
+ tmpQueryStmt.getColLabels().get(i), null, null);
+ colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
+ createStmt_.getColumnDefs().add(colDef);
+ }
+ createStmt_.analyze(analyzer);
+
+ if (!SUPPORTED_INSERT_FORMATS.contains(createStmt_.getFileFormat())) {
+ throw new AnalysisException(String.format("CREATE TABLE AS SELECT " +
+ "does not support (%s) file format. Supported formats are: (%s)",
+ createStmt_.getFileFormat().toString().replace("_", ""),
+ "PARQUET, TEXTFILE"));
+ }
+
+ // The full privilege check for the database will be done as part of the INSERT
+ // analysis.
+ Db db = analyzer.getDb(createStmt_.getDb(), Privilege.ANY);
+ if (db == null) {
+ throw new AnalysisException(
+ Analyzer.DB_DOES_NOT_EXIST_ERROR_MSG + createStmt_.getDb());
+ }
+
+ // Running analysis on the INSERT portion of the CTAS requires the target INSERT
+ // table to "exist". For CTAS the table does not exist yet, so create a "temp"
+ // table to run analysis against. The schema of this temp table should exactly
+ // match the schema of the table that will be created by running the CREATE
+ // statement.
+ org.apache.hadoop.hive.metastore.api.Table msTbl =
+ CatalogOpExecutor.createMetaStoreTable(createStmt_.toThrift());
+
+ try (MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient()) {
+ // Set a valid location of this table using the same rules as the metastore. If the
+ // user specified a location for the table this will be a no-op.
+ msTbl.getSd().setLocation(analyzer.getCatalog().getTablePath(msTbl).toString());
+
+ // Create a "temp" table based off the given metastore.api.Table object. Normally,
+ // the CatalogService assigns all table IDs, but in this case we need to assign the
+ // "temp" table an ID locally. This table ID cannot conflict with any table in the
+ // SelectStmt (or the BE will be very confused). To ensure the ID is unique within
+ // this query, just assign it the invalid table ID. The CatalogServer will assign
+ // this table a proper ID once it is created there as part of the CTAS execution.
+ Table table = Table.fromMetastoreTable(TableId.createInvalidId(), db, msTbl);
+ Preconditions.checkState(table != null &&
+ (table instanceof HdfsTable || table instanceof KuduTable));
+
+ table.load(true, client.getHiveClient(), msTbl);
+ insertStmt_.setTargetTable(table);
+ } catch (TableLoadingException e) {
+ throw new AnalysisException(e.getMessage(), e);
+ } catch (Exception e) {
+ throw new AnalysisException(e.getMessage(), e);
+ }
+
+ // Finally, run analysis on the insert statement.
+ insertStmt_.analyze(analyzer);
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ insertStmt_.reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
new file mode 100644
index 0000000..0faf881
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_API_VER;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_CLASS;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_DATA_SRC_NAME;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_INIT_STRING;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_LOCATION;
+
+import java.util.List;
+import java.util.Map;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.DataSource;
+import com.cloudera.impala.catalog.DataSourceTable;
+import com.cloudera.impala.catalog.RowFormat;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.permission.FsAction;
+
+/**
+ * Represents a CREATE TABLE statement for external data sources. Such tables
+ * reference an external data source (created with a CREATE DATA SOURCE statement)
+ * and the properties of that source are stored in the table properties because
+ * the metastore does not store the data sources themselves.
+ */
+public class CreateTableDataSrcStmt extends CreateTableStmt {
+
+ public CreateTableDataSrcStmt(TableName tableName, List<ColumnDef> columnDefs,
+ String dataSourceName, String initString, String comment, boolean ifNotExists) {
+ super(tableName, columnDefs, Lists.<ColumnDef>newArrayList(), false, comment,
+ RowFormat.DEFAULT_ROW_FORMAT, THdfsFileFormat.TEXT, null, null, ifNotExists,
+ createInitialTableProperties(dataSourceName, initString),
+ Maps.<String, String>newHashMap(), null);
+ }
+
+ /**
+ * Creates the initial map of table properties containing the name of the data
+ * source and the table init string.
+ */
+ private static Map<String, String> createInitialTableProperties(
+ String dataSourceName, String initString) {
+ Preconditions.checkNotNull(dataSourceName);
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(TBL_PROP_DATA_SRC_NAME, dataSourceName.toLowerCase());
+ tableProperties.put(TBL_PROP_INIT_STRING, Strings.nullToEmpty(initString));
+ return tableProperties;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ super.analyze(analyzer);
+ String dataSourceName = getTblProperties().get(TBL_PROP_DATA_SRC_NAME);
+ DataSource dataSource = analyzer.getCatalog().getDataSource(dataSourceName);
+ if (dataSource == null) {
+ throw new AnalysisException("Data source does not exist: " + dataSourceName);
+ }
+
+ for (ColumnDef col: getColumnDefs()) {
+ if (!DataSourceTable.isSupportedColumnType(col.getType())) {
+ throw new AnalysisException("Tables produced by an external data source do " +
+ "not support the column type: " + col.getType());
+ }
+ }
+ // Add table properties from the DataSource catalog object now that we have access
+ // to the catalog. These are stored in the table metadata because DataSource catalog
+ // objects are not currently persisted.
+ String location = dataSource.getLocation();
+ getTblProperties().put(TBL_PROP_LOCATION, location);
+ getTblProperties().put(TBL_PROP_CLASS, dataSource.getClassName());
+ getTblProperties().put(TBL_PROP_API_VER, dataSource.getApiVersion());
+ new HdfsUri(location).analyze(analyzer, Privilege.ALL, FsAction.READ);
+ // TODO: check class exists and implements API version
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
new file mode 100644
index 0000000..6695cac
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.ArrayType;
+import com.cloudera.impala.catalog.HdfsCompression;
+import com.cloudera.impala.catalog.HdfsFileFormat;
+import com.cloudera.impala.catalog.MapType;
+import com.cloudera.impala.catalog.RowFormat;
+import com.cloudera.impala.catalog.ScalarType;
+import com.cloudera.impala.catalog.StructField;
+import com.cloudera.impala.catalog.StructType;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.FileSystemUtil;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+
+/**
+ * Represents a CREATE TABLE tablename LIKE fileformat '/path/to/file' statement
+ * where the schema is inferred from the given file. Does not partition the table by
+ * default.
+ */
+public class CreateTableLikeFileStmt extends CreateTableStmt {
+ private final HdfsUri schemaLocation_;
+ private final THdfsFileFormat schemaFileFormat_;
+ private final static String ERROR_MSG =
+ "Failed to convert Parquet type\n%s\nto an Impala %s type:\n%s\n";
+
+ public CreateTableLikeFileStmt(TableName tableName, THdfsFileFormat schemaFileFormat,
+ HdfsUri schemaLocation, List<ColumnDef> partitionColumnDescs,
+ boolean isExternal, String comment, RowFormat rowFormat,
+ THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp,
+ boolean ifNotExists, Map<String, String> tblProperties,
+ Map<String, String> serdeProperties) {
+ super(tableName, new ArrayList<ColumnDef>(), partitionColumnDescs,
+ isExternal, comment, rowFormat,
+ fileFormat, location, cachingOp, ifNotExists, tblProperties, serdeProperties,
+ null);
+ schemaLocation_ = schemaLocation;
+ schemaFileFormat_ = schemaFileFormat;
+ }
+
+ /**
+ * Reads the first block from the given HDFS file and returns the Parquet schema.
+ * Throws Analysis exception for any failure, such as failing to read the file
+ * or failing to parse the contents.
+ */
+ private static parquet.schema.MessageType loadParquetSchema(Path pathToFile)
+ throws AnalysisException {
+ try {
+ FileSystem fs = pathToFile.getFileSystem(FileSystemUtil.getConfiguration());
+ if (!fs.isFile(pathToFile)) {
+ throw new AnalysisException("Cannot infer schema, path is not a file: " +
+ pathToFile);
+ }
+ } catch (IOException e) {
+ throw new AnalysisException("Failed to connect to filesystem:" + e);
+ } catch (IllegalArgumentException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ ParquetMetadata readFooter = null;
+ try {
+ readFooter = ParquetFileReader.readFooter(FileSystemUtil.getConfiguration(),
+ pathToFile);
+ } catch (FileNotFoundException e) {
+ throw new AnalysisException("File not found: " + e);
+ } catch (IOException e) {
+ throw new AnalysisException("Failed to open file as a parquet file: " + e);
+ } catch (RuntimeException e) {
+ // Parquet throws a generic RuntimeException when reading a non-parquet file
+ if (e.toString().contains("is not a Parquet file")) {
+ throw new AnalysisException("File is not a parquet file: " + pathToFile);
+ }
+ // otherwise, who knows what we caught, throw it back up
+ throw e;
+ }
+ return readFooter.getFileMetaData().getSchema();
+ }
+
+ /**
+ * Converts a "primitive" Parquet type to an Impala type.
+ * A primitive type is a non-nested type with no annotations.
+ */
+ private static Type convertPrimitiveParquetType(parquet.schema.Type parquetType)
+ throws AnalysisException {
+ Preconditions.checkState(parquetType.isPrimitive());
+ PrimitiveType prim = parquetType.asPrimitiveType();
+ switch (prim.getPrimitiveTypeName()) {
+ case BINARY: return Type.STRING;
+ case BOOLEAN: return Type.BOOLEAN;
+ case DOUBLE: return Type.DOUBLE;
+ case FIXED_LEN_BYTE_ARRAY:
+ throw new AnalysisException(
+ "Unsupported parquet type FIXED_LEN_BYTE_ARRAY for field " +
+ parquetType.getName());
+ case FLOAT: return Type.FLOAT;
+ case INT32: return Type.INT;
+ case INT64: return Type.BIGINT;
+ case INT96: return Type.TIMESTAMP;
+ default:
+ Preconditions.checkState(false, "Unexpected parquet primitive type: " +
+ prim.getPrimitiveTypeName());
+ return null;
+ }
+ }
+
+ /**
+ * Converts a Parquet group type to an Impala map Type. We support both standard
+ * Parquet map representations, as well as legacy. Legacy representations are handled
+ * according to this specification:
+ * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
+ *
+ * Standard representation of a map in Parquet:
+ * <optional | required> group <name> (MAP) { <-- outerGroup is pointing at this
+ * repeated group key_value {
+ * required <key-type> key;
+ * <optional | required> <value-type> value;
+ * }
+ * }
+ */
+ private static MapType convertMap(parquet.schema.GroupType outerGroup)
+ throws AnalysisException {
+ if (outerGroup.getFieldCount() != 1){
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "MAP", "The logical MAP type must have exactly 1 inner field."));
+ }
+
+ parquet.schema.Type innerField = outerGroup.getType(0);
+ if (!innerField.isRepetition(parquet.schema.Type.Repetition.REPEATED)){
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "MAP", "The logical MAP type must have a repeated inner field."));
+ }
+ if (innerField.isPrimitive()) {
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "MAP", "The inner field of the logical MAP type must be a group."));
+ }
+
+ parquet.schema.GroupType innerGroup = innerField.asGroupType();
+ // It does not matter whether innerGroup has an annotation or not (for example it may
+ // be annotated with MAP_KEY_VALUE). We treat the case that innerGroup has an
+ // annotation and the case the innerGroup does not have an annotation the same.
+ if (innerGroup.getFieldCount() != 2) {
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "MAP", "The inner field of the logical MAP type must have exactly 2 fields."));
+ }
+
+ parquet.schema.Type key = innerGroup.getType(0);
+ if (!key.getName().equals("key")) {
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "MAP", "The name of the first field of the inner field of the logical MAP " +
+ "type must be 'key'"));
+ }
+ if (!key.isPrimitive()) {
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "MAP", "The key type of the logical MAP type must be primitive."));
+ }
+ parquet.schema.Type value = innerGroup.getType(1);
+ if (!value.getName().equals("value")) {
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "MAP", "The name of the second field of the inner field of the logical MAP " +
+ "type must be 'value'"));
+ }
+
+ return new MapType(convertParquetType(key), convertParquetType(value));
+ }
+
+ /**
+ * Converts a Parquet group type to an Impala struct Type.
+ */
+ private static StructType convertStruct(parquet.schema.GroupType outerGroup)
+ throws AnalysisException {
+ ArrayList<StructField> structFields = new ArrayList<StructField>();
+ for (parquet.schema.Type field: outerGroup.getFields()) {
+ StructField f = new StructField(field.getName(), convertParquetType(field));
+ structFields.add(f);
+ }
+ return new StructType(structFields);
+ }
+
+ /**
+ * Converts a Parquet group type to an Impala array Type. We can handle the standard
+ * representation, but also legacy representations for backwards compatibility.
+ * Legacy representations are handled according to this specification:
+ * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+ *
+ * Standard representation of an array in Parquet:
+ * <optional | required> group <name> (LIST) { <-- outerGroup is pointing at this
+ * repeated group list {
+ * <optional | required> <element-type> element;
+ * }
+ * }
+ */
+ private static ArrayType convertArray(parquet.schema.GroupType outerGroup)
+ throws AnalysisException {
+ if (outerGroup.getFieldCount() != 1) {
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "LIST", "The logical LIST type must have exactly 1 inner field."));
+ }
+
+ parquet.schema.Type innerField = outerGroup.getType(0);
+ if (!innerField.isRepetition(parquet.schema.Type.Repetition.REPEATED)) {
+ throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+ "LIST", "The inner field of the logical LIST type must be repeated."));
+ }
+ if (innerField.isPrimitive() || innerField.getOriginalType() != null) {
+ // From the Parquet Spec:
+ // 1. If the repeated field is not a group then it's type is the element type.
+ //
+ // If innerField is a group, but originalType is not null, the element type is
+ // based on the logical type.
+ return new ArrayType(convertParquetType(innerField));
+ }
+
+ parquet.schema.GroupType innerGroup = innerField.asGroupType();
+ if (innerGroup.getFieldCount() != 1) {
+ // From the Parquet Spec:
+ // 2. If the repeated field is a group with multiple fields, then it's type is a
+ // struct.
+ return new ArrayType(convertStruct(innerGroup));
+ }
+
+ return new ArrayType(convertParquetType(innerGroup.getType(0)));
+ }
+
+ /**
+ * Converts a "logical" Parquet type to an Impala column type.
+ * A Parquet type is considered logical when it has an annotation. The annotation is
+ * stored as a "OriginalType". The Parquet documentation refers to these as logical
+ * types, so we use that terminology here.
+ */
+ private static Type convertLogicalParquetType(parquet.schema.Type parquetType)
+ throws AnalysisException {
+ OriginalType orig = parquetType.getOriginalType();
+ if (orig == OriginalType.LIST) {
+ return convertArray(parquetType.asGroupType());
+ }
+ if (orig == OriginalType.MAP || orig == OriginalType.MAP_KEY_VALUE) {
+ // MAP_KEY_VALUE annotation should not be used any more. However, according to the
+ // Parquet spec, some existing data incorrectly uses MAP_KEY_VALUE in place of MAP.
+ // For backward-compatibility, a group annotated with MAP_KEY_VALUE that is not
+ // contained by a MAP-annotated group should be handled as a MAP-annotated group.
+ return convertMap(parquetType.asGroupType());
+ }
+
+ PrimitiveType prim = parquetType.asPrimitiveType();
+ if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY &&
+ orig == OriginalType.UTF8) {
+ // UTF8 is the type annotation Parquet uses for strings
+ // We check to make sure it applies to BINARY to avoid errors if there is a bad
+ // annotation.
+ return Type.STRING;
+ }
+
+ if (orig == OriginalType.DECIMAL) {
+ return ScalarType.createDecimalType(prim.getDecimalMetadata().getPrecision(),
+ prim.getDecimalMetadata().getScale());
+ }
+
+ throw new AnalysisException(
+ "Unsupported logical parquet type " + orig + " (primitive type is " +
+ prim.getPrimitiveTypeName().name() + ") for field " +
+ parquetType.getName());
+ }
+
+ /**
+ * Converts a Parquet type into an Impala type.
+ */
+ private static Type convertParquetType(parquet.schema.Type field)
+ throws AnalysisException {
+ Type type = null;
+ // TODO for 2.3: If a field is not annotated with LIST, it can still be sometimes
+ // interpreted as an array. The following 2 examples should be interpreted as an array
+ // of integers, but this is currently not done.
+ // 1. repeated int int_col;
+ // 2. required group int_arr {
+ // repeated group list {
+ // required int element;
+ // }
+ // }
+ if (field.getOriginalType() != null) {
+ type = convertLogicalParquetType(field);
+ } else if (field.isPrimitive()) {
+ type = convertPrimitiveParquetType(field);
+ } else {
+ // If field is not primitive, it must be a struct.
+ type = convertStruct(field.asGroupType());
+ }
+ return type;
+ }
+
+ /**
+ * Parses a Parquet file stored in HDFS and returns the corresponding Impala schema.
+ * This fails with an analysis exception if any errors occur reading the file,
+ * parsing the Parquet schema, or if the Parquet types cannot be represented in Impala.
+ */
+ private static List<ColumnDef> extractParquetSchema(HdfsUri location)
+ throws AnalysisException {
+ parquet.schema.MessageType parquetSchema = loadParquetSchema(location.getPath());
+ List<parquet.schema.Type> fields = parquetSchema.getFields();
+ List<ColumnDef> schema = new ArrayList<ColumnDef>();
+
+ for (parquet.schema.Type field: fields) {
+ Type type = convertParquetType(field);
+ Preconditions.checkNotNull(type);
+ String colName = field.getName();
+ schema.add(new ColumnDef(colName, new TypeDef(type),
+ "Inferred from Parquet file."));
+ }
+ return schema;
+ }
+
+ @Override
+ public String toSql() {
+ ArrayList<String> colsSql = Lists.newArrayList();
+ ArrayList<String> partitionColsSql = Lists.newArrayList();
+ HdfsCompression compression = HdfsCompression.fromFileName(
+ schemaLocation_.toString());
+ String s = ToSqlUtils.getCreateTableSql(getDb(),
+ getTbl() + " __LIKE_FILEFORMAT__ ", getComment(), colsSql, partitionColsSql,
+ getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
+ getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()),
+ compression, null, getLocation());
+ s = s.replace("__LIKE_FILEFORMAT__", "LIKE " + schemaFileFormat_ + " " +
+ schemaLocation_.toString());
+ return s;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ schemaLocation_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+ switch (schemaFileFormat_) {
+ case PARQUET:
+ getColumnDefs().addAll(extractParquetSchema(schemaLocation_));
+ break;
+ default:
+ throw new AnalysisException("Unsupported file type for schema inference: "
+ + schemaFileFormat_);
+ }
+ super.analyze(analyzer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
new file mode 100644
index 0000000..a7e2038
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import org.apache.hadoop.fs.permission.FsAction;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TAccessEvent;
+import com.cloudera.impala.thrift.TCatalogObjectType;
+import com.cloudera.impala.thrift.TCreateTableLikeParams;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.cloudera.impala.thrift.TTableName;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE TABLE LIKE statement which creates a new table based on
+ * a copy of an existing table definition.
+ */
+public class CreateTableLikeStmt extends StatementBase {
+ private final TableName tableName_;
+ private final TableName srcTableName_;
+ private final boolean isExternal_;
+ private final String comment_;
+ private final THdfsFileFormat fileFormat_;
+ private final HdfsUri location_;
+ private final boolean ifNotExists_;
+
+ // Set during analysis
+ private String dbName_;
+ private String srcDbName_;
+ private String owner_;
+
+ /**
+ * Builds a CREATE TABLE LIKE statement
+ * @param tableName - Name of the new table
+ * @param srcTableName - Name of the source table (table to copy)
+ * @param isExternal - If true, the table's data will be preserved if dropped.
+ * @param comment - Comment to attach to the table
+ * @param fileFormat - File format of the table
+ * @param location - The HDFS location of where the table data will stored.
+ * @param ifNotExists - If true, no errors are thrown if the table already exists
+ */
+ public CreateTableLikeStmt(TableName tableName, TableName srcTableName,
+ boolean isExternal, String comment, THdfsFileFormat fileFormat, HdfsUri location,
+ boolean ifNotExists) {
+ Preconditions.checkNotNull(tableName);
+ Preconditions.checkNotNull(srcTableName);
+ this.tableName_ = tableName;
+ this.srcTableName_ = srcTableName;
+ this.isExternal_ = isExternal;
+ this.comment_ = comment;
+ this.fileFormat_ = fileFormat;
+ this.location_ = location;
+ this.ifNotExists_ = ifNotExists;
+ }
+
+ public String getTbl() { return tableName_.getTbl(); }
+ public String getSrcTbl() { return srcTableName_.getTbl(); }
+ public boolean isExternal() { return isExternal_; }
+ public boolean getIfNotExists() { return ifNotExists_; }
+ public String getComment() { return comment_; }
+ public THdfsFileFormat getFileFormat() { return fileFormat_; }
+ public HdfsUri getLocation() { return location_; }
+
+ /**
+ * Can only be called after analysis, returns the name of the database the table will
+ * be created within.
+ */
+ public String getDb() {
+ Preconditions.checkNotNull(dbName_);
+ return dbName_;
+ }
+
+ /**
+ * Can only be called after analysis, returns the name of the database the table will
+ * be created within.
+ */
+ public String getSrcDb() {
+ Preconditions.checkNotNull(srcDbName_);
+ return srcDbName_;
+ }
+
+ public String getOwner() {
+ Preconditions.checkNotNull(owner_);
+ return owner_;
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder("CREATE ");
+ if (isExternal_) sb.append("EXTERNAL ");
+ sb.append("TABLE ");
+ if (ifNotExists_) sb.append("IF NOT EXISTS ");
+ if (tableName_.getDb() != null) sb.append(tableName_.getDb() + ".");
+ sb.append(tableName_.getTbl() + " LIKE ");
+ if (srcTableName_.getDb() != null) sb.append(srcTableName_.getDb() + ".");
+ sb.append(srcTableName_.getTbl());
+ if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'");
+ if (fileFormat_ != null) sb.append(" STORED AS " + fileFormat_);
+ if (location_ != null) sb.append(" LOCATION '" + location_ + "'");
+ return sb.toString();
+ }
+
+ public TCreateTableLikeParams toThrift() {
+ TCreateTableLikeParams params = new TCreateTableLikeParams();
+ params.setTable_name(new TTableName(getDb(), getTbl()));
+ params.setSrc_table_name(new TTableName(getSrcDb(), getSrcTbl()));
+ params.setOwner(getOwner());
+ params.setIs_external(isExternal());
+ params.setComment(comment_);
+ if (fileFormat_ != null) params.setFile_format(fileFormat_);
+ params.setLocation(location_ == null ? null : location_.toString());
+ params.setIf_not_exists(getIfNotExists());
+ return params;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+ Preconditions.checkState(srcTableName_ != null && !srcTableName_.isEmpty());
+ // Make sure the source table exists and the user has permission to access it.
+ srcDbName_ = analyzer
+ .getTable(srcTableName_, Privilege.VIEW_METADATA)
+ .getDb().getName();
+ tableName_.analyze();
+ dbName_ = analyzer.getTargetDbName(tableName_);
+ owner_ = analyzer.getUser().getName();
+
+ if (analyzer.dbContainsTable(dbName_, tableName_.getTbl(), Privilege.CREATE) &&
+ !ifNotExists_) {
+ throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG +
+ String.format("%s.%s", dbName_, getTbl()));
+ }
+ analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(),
+ TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
+
+ if (location_ != null) {
+ location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
new file mode 100644
index 0000000..f7b683f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.hadoop.fs.permission.FsAction;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.HdfsStorageDescriptor;
+import com.cloudera.impala.catalog.KuduTable;
+import com.cloudera.impala.catalog.RowFormat;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.FileSystemUtil;
+import com.cloudera.impala.thrift.TAccessEvent;
+import com.cloudera.impala.thrift.TCatalogObjectType;
+import com.cloudera.impala.thrift.TCreateTableParams;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.cloudera.impala.thrift.TTableName;
+import com.cloudera.impala.util.AvroSchemaConverter;
+import com.cloudera.impala.util.AvroSchemaParser;
+import com.cloudera.impala.util.AvroSchemaUtils;
+import com.cloudera.impala.util.KuduUtil;
+import com.cloudera.impala.util.MetaStoreUtil;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Represents a CREATE TABLE statement.
+ */
+public class CreateTableStmt extends StatementBase {
+ private List<ColumnDef> columnDefs_;
+ private final String comment_;
+ private final boolean isExternal_;
+ private final boolean ifNotExists_;
+ private final THdfsFileFormat fileFormat_;
+ private final ArrayList<ColumnDef> partitionColDefs_;
+ private final RowFormat rowFormat_;
+ private TableName tableName_;
+ private final Map<String, String> tblProperties_;
+ private final Map<String, String> serdeProperties_;
+ private final HdfsCachingOp cachingOp_;
+ private HdfsUri location_;
+ private final List<DistributeParam> distributeParams_;
+
+ // Set during analysis
+ private String owner_;
+
+ /**
+ * Builds a CREATE TABLE statement
+ * @param tableName - Name of the new table
+ * @param columnDefs - List of column definitions for the table
+ * @param partitionColumnDefs - List of partition column definitions for the table
+ * @param isExternal - If true, the table's data will be preserved if dropped.
+ * @param comment - Comment to attach to the table
+ * @param rowFormat - Custom row format of the table. Use RowFormat.DEFAULT_ROW_FORMAT
+ * to specify default row format.
+ * @param fileFormat - File format of the table
+ * @param location - The HDFS location of where the table data will stored.
+ * @param cachingOp - The HDFS caching op that should be applied to this table.
+ * @param ifNotExists - If true, no errors are thrown if the table already exists.
+ * @param tblProperties - Optional map of key/values to persist with table metadata.
+ * @param serdeProperties - Optional map of key/values to persist with table serde
+ * metadata.
+ */
+ public CreateTableStmt(TableName tableName, List<ColumnDef> columnDefs,
+ List<ColumnDef> partitionColumnDefs, boolean isExternal, String comment,
+ RowFormat rowFormat, THdfsFileFormat fileFormat, HdfsUri location,
+ HdfsCachingOp cachingOp, boolean ifNotExists, Map<String, String> tblProperties,
+ Map<String, String> serdeProperties, List<DistributeParam> distributeParams) {
+ Preconditions.checkNotNull(columnDefs);
+ Preconditions.checkNotNull(partitionColumnDefs);
+ Preconditions.checkNotNull(fileFormat);
+ Preconditions.checkNotNull(rowFormat);
+ Preconditions.checkNotNull(tableName);
+
+ columnDefs_ = Lists.newArrayList(columnDefs);
+ comment_ = comment;
+ isExternal_ = isExternal;
+ ifNotExists_ = ifNotExists;
+ fileFormat_ = fileFormat;
+ location_ = location;
+ cachingOp_ = cachingOp;
+ partitionColDefs_ = Lists.newArrayList(partitionColumnDefs);
+ rowFormat_ = rowFormat;
+ tableName_ = tableName;
+ tblProperties_ = tblProperties;
+ serdeProperties_ = serdeProperties;
+ unescapeProperties(tblProperties_);
+ unescapeProperties(serdeProperties_);
+ distributeParams_ = distributeParams;
+ }
+
+ /**
+ * Copy c'tor.
+ */
+ public CreateTableStmt(CreateTableStmt other) {
+ columnDefs_ = Lists.newArrayList(other.columnDefs_);
+ comment_ = other.comment_;
+ isExternal_ = other.isExternal_;
+ ifNotExists_ = other.ifNotExists_;
+ fileFormat_ = other.fileFormat_;
+ location_ = other.location_;
+ cachingOp_ = other.cachingOp_;
+ partitionColDefs_ = Lists.newArrayList(other.partitionColDefs_);
+ rowFormat_ = other.rowFormat_;
+ tableName_ = other.tableName_;
+ tblProperties_ = other.tblProperties_;
+ serdeProperties_ = other.serdeProperties_;
+ distributeParams_ = other.distributeParams_;
+ }
+
+ @Override
+ public CreateTableStmt clone() { return new CreateTableStmt(this); }
+
+ public String getTbl() { return tableName_.getTbl(); }
+ public TableName getTblName() { return tableName_; }
+ public List<ColumnDef> getColumnDefs() { return columnDefs_; }
+ public List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
+ public String getComment() { return comment_; }
+ public boolean isExternal() { return isExternal_; }
+ public boolean getIfNotExists() { return ifNotExists_; }
+ public HdfsUri getLocation() { return location_; }
+ public void setLocation(HdfsUri location) { this.location_ = location; }
+ public THdfsFileFormat getFileFormat() { return fileFormat_; }
+ public RowFormat getRowFormat() { return rowFormat_; }
+ public Map<String, String> getTblProperties() { return tblProperties_; }
+ public Map<String, String> getSerdeProperties() { return serdeProperties_; }
+
+ /**
+ * Can only be called after analysis, returns the owner of this table (the user from
+ * the current session).
+ */
+ public String getOwner() {
+ Preconditions.checkNotNull(owner_);
+ return owner_;
+ }
+
+ /**
+ * Can only be called after analysis, returns the name of the database the table will
+ * be created within.
+ */
+ public String getDb() {
+ Preconditions.checkState(isAnalyzed());
+ return tableName_.getDb();
+ }
+
+ @Override
+ public String toSql() { return ToSqlUtils.getCreateTableSql(this); }
+
+ public TCreateTableParams toThrift() {
+ TCreateTableParams params = new TCreateTableParams();
+ params.setTable_name(new TTableName(getDb(), getTbl()));
+ for (ColumnDef col: getColumnDefs()) {
+ params.addToColumns(col.toThrift());
+ }
+ for (ColumnDef col: getPartitionColumnDefs()) {
+ params.addToPartition_columns(col.toThrift());
+ }
+ params.setOwner(getOwner());
+ params.setIs_external(isExternal());
+ params.setComment(comment_);
+ params.setLocation(location_ == null ? null : location_.toString());
+ if (cachingOp_ != null) params.setCache_op(cachingOp_.toThrift());
+ params.setRow_format(rowFormat_.toThrift());
+ params.setFile_format(fileFormat_);
+ params.setIf_not_exists(getIfNotExists());
+ if (tblProperties_ != null) params.setTable_properties(tblProperties_);
+ if (serdeProperties_ != null) params.setSerde_properties(serdeProperties_);
+ if (distributeParams_ != null) {
+ for (DistributeParam d : distributeParams_) {
+ params.addToDistribute_by(d.toThrift());
+ }
+ }
+ return params;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ super.analyze(analyzer);
+ Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+ tableName_ = analyzer.getFqTableName(tableName_);
+ tableName_.analyze();
+ owner_ = analyzer.getUser().getName();
+
+ MetaStoreUtil.checkShortPropertyMap("Property", tblProperties_);
+ MetaStoreUtil.checkShortPropertyMap("Serde property", serdeProperties_);
+
+ if (analyzer.dbContainsTable(tableName_.getDb(), tableName_.getTbl(),
+ Privilege.CREATE) && !ifNotExists_) {
+ throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + tableName_);
+ }
+
+ analyzer.addAccessEvent(new TAccessEvent(tableName_.toString(),
+ TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
+
+ // Only Avro tables can have empty column defs because they can infer them from
+ // the Avro schema.
+ if (columnDefs_.isEmpty() && fileFormat_ != THdfsFileFormat.AVRO) {
+ throw new AnalysisException("Table requires at least 1 column");
+ }
+
+ if (location_ != null) {
+ location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+ }
+
+ analyzeRowFormat(analyzer);
+
+ // Check that all the column names are valid and unique.
+ analyzeColumnDefs(analyzer);
+
+ if (getTblProperties() != null && KuduTable.KUDU_STORAGE_HANDLER.equals(
+ getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
+ analyzeKuduTable(analyzer);
+ } else if (distributeParams_ != null) {
+ throw new AnalysisException("Only Kudu tables can use DISTRIBUTE BY clause.");
+ }
+
+ if (fileFormat_ == THdfsFileFormat.AVRO) {
+ columnDefs_ = analyzeAvroSchema(analyzer);
+ if (columnDefs_.isEmpty()) {
+ throw new AnalysisException(
+ "An Avro table requires column definitions or an Avro schema.");
+ }
+ AvroSchemaUtils.setFromSerdeComment(columnDefs_);
+ analyzeColumnDefs(analyzer);
+ }
+
+ if (cachingOp_ != null) {
+ cachingOp_.analyze(analyzer);
+ if (cachingOp_.shouldCache() && location_ != null &&
+ !FileSystemUtil.isPathCacheable(location_.getPath())) {
+ throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
+ "Please retry without caching: CREATE TABLE %s ... UNCACHED",
+ location_.toString(), tableName_));
+ }
+ }
+
+ // Analyze 'skip.header.line.format' property.
+ if (tblProperties_ != null) {
+ AlterTableSetTblProperties.analyzeSkipHeaderLineCount(tblProperties_);
+ }
+ }
+
+ private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
+ Byte fieldDelim = analyzeRowFormatValue(rowFormat_.getFieldDelimiter());
+ Byte lineDelim = analyzeRowFormatValue(rowFormat_.getLineDelimiter());
+ Byte escapeChar = analyzeRowFormatValue(rowFormat_.getEscapeChar());
+ if (fileFormat_ == THdfsFileFormat.TEXT) {
+ if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM;
+ if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM;
+ if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR;
+ if (fieldDelim != null && lineDelim != null && fieldDelim.equals(lineDelim)) {
+ throw new AnalysisException("Field delimiter and line delimiter have same " +
+ "value: byte " + fieldDelim);
+ }
+ if (fieldDelim != null && escapeChar != null && fieldDelim.equals(escapeChar)) {
+ analyzer.addWarning("Field delimiter and escape character have same value: " +
+ "byte " + fieldDelim + ". Escape character will be ignored");
+ }
+ if (lineDelim != null && escapeChar != null && lineDelim.equals(escapeChar)) {
+ analyzer.addWarning("Line delimiter and escape character have same value: " +
+ "byte " + lineDelim + ". Escape character will be ignored");
+ }
+ }
+ }
+
+ /**
+ * Analyzes columnDefs_ and partitionColDefs_ checking whether all column
+ * names are unique.
+ */
+ private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException {
+ Set<String> colNames = Sets.newHashSet();
+ for (ColumnDef colDef: columnDefs_) {
+ colDef.analyze();
+ if (!colNames.add(colDef.getColName().toLowerCase())) {
+ throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+ }
+ }
+ for (ColumnDef colDef: partitionColDefs_) {
+ colDef.analyze();
+ if (!colDef.getType().supportsTablePartitioning()) {
+ throw new AnalysisException(
+ String.format("Type '%s' is not supported as partition-column type " +
+ "in column: %s", colDef.getType().toSql(), colDef.getColName()));
+ }
+ if (!colNames.add(colDef.getColName().toLowerCase())) {
+ throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+ }
+ }
+ }
+
+ /**
+ * Analyzes the Avro schema and compares it with the columnDefs_ to detect
+ * inconsistencies. Returns a list of column descriptors that should be
+ * used for creating the table (possibly identical to columnDefs_).
+ */
+ private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer)
+ throws AnalysisException {
+ Preconditions.checkState(fileFormat_ == THdfsFileFormat.AVRO);
+ // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter
+ // taking precedence.
+ List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
+ schemaSearchLocations.add(serdeProperties_);
+ schemaSearchLocations.add(tblProperties_);
+ String avroSchema = null;
+ List<ColumnDef> avroCols = null; // parsed from avroSchema
+ try {
+ avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
+ if (avroSchema == null) {
+ // No Avro schema was explicitly set in the serde or table properties, so infer
+ // the Avro schema from the column definitions.
+ Schema inferredSchema = AvroSchemaConverter.convertColumnDefs(
+ columnDefs_, tableName_.toString());
+ avroSchema = inferredSchema.toString();
+ }
+ if (Strings.isNullOrEmpty(avroSchema)) {
+ throw new AnalysisException("Avro schema is null or empty: " +
+ tableName_.toString());
+ }
+ avroCols = AvroSchemaParser.parse(avroSchema);
+ } catch (SchemaParseException e) {
+ throw new AnalysisException(String.format(
+ "Error parsing Avro schema for table '%s': %s", tableName_.toString(),
+ e.getMessage()));
+ }
+ Preconditions.checkNotNull(avroCols);
+
+ // Analyze the Avro schema to detect inconsistencies with the columnDefs_.
+ // In case of inconsistencies, the column defs are ignored in favor of the Avro
+ // schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104).
+ StringBuilder warning = new StringBuilder();
+ List<ColumnDef> reconciledColDefs =
+ AvroSchemaUtils.reconcileSchemas(columnDefs_, avroCols, warning);
+ if (warning.length() > 0) analyzer.addWarning(warning.toString());
+ return reconciledColDefs;
+ }
+
+ private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException {
+ // Validate that Kudu table is correctly specified.
+ if (!KuduTable.tableParamsAreValid(getTblProperties())) {
+ throw new AnalysisException("Kudu table is missing parameters " +
+ String.format("in table properties. Please verify if %s, %s, and %s are "
+ + "present and have valid values.",
+ KuduTable.KEY_TABLE_NAME, KuduTable.KEY_MASTER_ADDRESSES,
+ KuduTable.KEY_KEY_COLUMNS));
+ }
+
+ // Kudu table cannot be a cached table
+ if (cachingOp_ != null) {
+ throw new AnalysisException("A Kudu table cannot be cached in HDFS.");
+ }
+
+ if (distributeParams_ != null) {
+ if (isExternal_) {
+ throw new AnalysisException(
+ "The DISTRIBUTE BY clause may not be specified for external tables.");
+ }
+
+ List<String> keyColumns = KuduUtil.parseKeyColumnsAsList(
+ getTblProperties().get(KuduTable.KEY_KEY_COLUMNS));
+ for (DistributeParam d : distributeParams_) {
+ // If the columns are not set, default to all key columns
+ if (d.getColumns() == null) d.setColumns(keyColumns);
+ d.analyze(analyzer);
+ }
+ } else if (!isExternal_) {
+ throw new AnalysisException(
+ "A data distribution must be specified using the DISTRIBUTE BY clause.");
+ }
+ }
+
+ private Byte analyzeRowFormatValue(String value) throws AnalysisException {
+ if (value == null) return null;
+ Byte byteVal = HdfsStorageDescriptor.parseDelim(value);
+ if (byteVal == null) {
+ throw new AnalysisException("ESCAPED BY values and LINE/FIELD " +
+ "terminators must be specified as a single character or as a decimal " +
+ "value in the range [-128:127]: " + value);
+ }
+ return byteVal;
+ }
+
+ /**
+ * Unescapes all values in the property map.
+ */
+ public static void unescapeProperties(Map<String, String> propertyMap) {
+ if (propertyMap == null) return;
+ for (Map.Entry<String, String> kv : propertyMap.entrySet()) {
+ propertyMap.put(kv.getKey(),
+ new StringLiteral(kv.getValue()).getUnescapedValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java
new file mode 100644
index 0000000..46b0003
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.cloudera.impala.catalog.AggregateFunction;
+import com.cloudera.impala.catalog.Function;
+import com.cloudera.impala.catalog.PrimitiveType;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TFunctionBinaryType;
+import com.cloudera.impala.thrift.TSymbolType;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE AGGREGATE FUNCTION statement.
+ */
+public class CreateUdaStmt extends CreateFunctionStmtBase {
+ private final TypeDef intermediateTypeDef_;
+
+ /**
+ * Builds a CREATE AGGREGATE FUNCTION statement
+ * @param fnName - Name of the function
+ * @param fnArgs - List of types for the arguments to this function
+ * @param retType - The type this function returns.
+ * @param intermediateType_- The type used for the intermediate data.
+ * @param location - Path in HDFS containing the UDA.
+ * @param ifNotExists - If true, no errors are thrown if the function already exists
+ * @param additionalArgs - Key/Value pairs for additional arguments. The keys are
+ * validated in analyze()
+ */
+ public CreateUdaStmt(FunctionName fnSymbol, FunctionArgs args,
+ TypeDef retTypeDef, TypeDef intermediateTypeDef,
+ HdfsUri location, boolean ifNotExists,
+ HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) {
+ super(fnSymbol, args, retTypeDef, location, ifNotExists, optArgs);
+ intermediateTypeDef_ = intermediateTypeDef;
+ }
+
+ private void reportCouldNotInferSymbol(String function) throws AnalysisException {
+ throw new AnalysisException("Could not infer symbol for "
+ + function + "() function.");
+ }
+
+ // Gets the symbol for 'arg'. If the user set it from the dll, return that. Otherwise
+ // try to infer the Symbol from the Update function. To infer the Symbol, the update
+ // function must contain "update" or "Update" and we switch that out with 'defaultSymbol'.
+ // Returns null if no symbol was found.
+ private String getSymbolSymbol(OptArg arg, String defaultSymbol) {
+ // First lookup if the user explicitly set it.
+ if (optArgs_.get(arg) != null) return optArgs_.get(arg);
+ // Try to match it from Update
+ String updateFn = optArgs_.get(OptArg.UPDATE_FN);
+ // Mangled strings start with _Z. We can't get substitute Symbols for mangled
+ // strings.
+ // TODO: this is doable in the BE with more symbol parsing.
+ if (updateFn.startsWith("_Z")) return null;
+
+ if (updateFn.contains("update")) return updateFn.replace("update", defaultSymbol);
+ if (updateFn.contains("Update")) {
+ char[] array = defaultSymbol.toCharArray();
+ array[0] = Character.toUpperCase(array[0]);
+ String s = new String(array);
+ return updateFn.replace("Update", s);
+ }
+ return null;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ super.analyze(analyzer);
+ Preconditions.checkNotNull(fn_);
+ Preconditions.checkState(fn_ instanceof AggregateFunction);
+ AggregateFunction uda = (AggregateFunction) fn_;
+
+ if (uda.getNumArgs() == 0) {
+ throw new AnalysisException("UDAs must take at least one argument.");
+ }
+
+ if (uda.getBinaryType() == TFunctionBinaryType.JAVA) {
+ throw new AnalysisException("Java UDAs are not supported.");
+ }
+
+ // TODO: these are temporarily restrictions since the BE cannot yet
+ // execute them.
+ if (uda.getBinaryType() == TFunctionBinaryType.IR) {
+ throw new AnalysisException("IR UDAs are not yet supported.");
+ }
+ if (fn_.hasVarArgs()) {
+ throw new AnalysisException("UDAs with varargs are not yet supported.");
+ }
+ if (fn_.getNumArgs() > 8) {
+ throw new AnalysisException(
+ "UDAs with more than 8 arguments are not yet supported.");
+ }
+
+ if (uda.getReturnType().getPrimitiveType() == PrimitiveType.CHAR) {
+ throw new AnalysisException("UDAs with CHAR return type are not yet supported.");
+ }
+ if (uda.getReturnType().getPrimitiveType() == PrimitiveType.VARCHAR) {
+ throw new AnalysisException("UDAs with VARCHAR return type are not yet supported.");
+ }
+ for (int i = 0; i < uda.getNumArgs(); ++i) {
+ if (uda.getArgs()[i].getPrimitiveType() == PrimitiveType.CHAR) {
+ throw new AnalysisException("UDAs with CHAR arguments are not yet supported.");
+ }
+ if (uda.getArgs()[i].getPrimitiveType() == PrimitiveType.VARCHAR) {
+ throw new AnalysisException("UDAs with VARCHAR arguments are not yet supported.");
+ }
+ }
+
+ Type intermediateType = null;
+ if (intermediateTypeDef_ == null) {
+ intermediateType = uda.getReturnType();
+ } else {
+ intermediateTypeDef_.analyze(analyzer);
+ intermediateType = intermediateTypeDef_.getType();
+ }
+ uda.setIntermediateType(intermediateType);
+
+ // Check arguments that are only valid in UDFs are not set.
+ checkOptArgNotSet(OptArg.SYMBOL);
+ checkOptArgNotSet(OptArg.PREPARE_FN);
+ checkOptArgNotSet(OptArg.CLOSE_FN);
+
+ // The user must provide the symbol for Update.
+ uda.setUpdateFnSymbol(uda.lookupSymbol(
+ checkAndGetOptArg(OptArg.UPDATE_FN), TSymbolType.UDF_EVALUATE, intermediateType,
+ uda.hasVarArgs(), uda.getArgs()));
+
+ // If the ddl did not specify the init/serialize/merge/finalize function
+ // Symbols, guess them based on the update fn Symbol.
+ Preconditions.checkNotNull(uda.getUpdateFnSymbol());
+ uda.setInitFnSymbol(getSymbolSymbol(OptArg.INIT_FN, "init"));
+ uda.setSerializeFnSymbol(getSymbolSymbol(OptArg.SERIALIZE_FN, "serialize"));
+ uda.setMergeFnSymbol(getSymbolSymbol(OptArg.MERGE_FN, "merge"));
+ uda.setFinalizeFnSymbol(getSymbolSymbol(OptArg.FINALIZE_FN, "finalize"));
+
+ // Init and merge are required.
+ if (uda.getInitFnSymbol() == null) reportCouldNotInferSymbol("init");
+ if (uda.getMergeFnSymbol() == null) reportCouldNotInferSymbol("merge");
+
+ // Validate that all set symbols exist.
+ uda.setInitFnSymbol(uda.lookupSymbol(uda.getInitFnSymbol(),
+ TSymbolType.UDF_EVALUATE, intermediateType, false));
+ uda.setMergeFnSymbol(uda.lookupSymbol(uda.getMergeFnSymbol(),
+ TSymbolType.UDF_EVALUATE, intermediateType, false, intermediateType));
+ if (uda.getSerializeFnSymbol() != null) {
+ try {
+ uda.setSerializeFnSymbol(uda.lookupSymbol(uda.getSerializeFnSymbol(),
+ TSymbolType.UDF_EVALUATE, null, false, intermediateType));
+ } catch (AnalysisException e) {
+ if (optArgs_.get(OptArg.SERIALIZE_FN) != null) {
+ throw e;
+ } else {
+ // Ignore, these symbols are optional.
+ uda.setSerializeFnSymbol(null);
+ }
+ }
+ }
+ if (uda.getFinalizeFnSymbol() != null) {
+ try {
+ uda.setFinalizeFnSymbol(uda.lookupSymbol(
+ uda.getFinalizeFnSymbol(), TSymbolType.UDF_EVALUATE, null, false,
+ intermediateType));
+ } catch (AnalysisException e) {
+ if (optArgs_.get(OptArg.FINALIZE_FN) != null) {
+ throw e;
+ } else {
+ // Ignore, these symbols are optional.
+ uda.setFinalizeFnSymbol(null);
+ }
+ }
+ }
+
+ // If the intermediate type is not the return type, then finalize is
+ // required.
+ if (!intermediateType.equals(fn_.getReturnType()) &&
+ uda.getFinalizeFnSymbol() == null) {
+ throw new AnalysisException("Finalize() is required for this UDA.");
+ }
+
+ sqlString_ = uda.toSql(ifNotExists_);
+ }
+
+ @Override
+ protected Function createFunction(FunctionName fnName, ArrayList<Type> argTypes,
+ Type retType, boolean hasVarArgs) {
+ return new AggregateFunction(fnName_, args_.getArgTypes(), retTypeDef_.getType(),
+ args_.hasVarArgs());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
new file mode 100644
index 0000000..550d26f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.catalog.Function;
+import com.cloudera.impala.catalog.PrimitiveType;
+import com.cloudera.impala.catalog.ScalarFunction;
+import com.cloudera.impala.catalog.ScalarType;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.hive.executor.UdfExecutor.JavaUdfDataType;
+import com.cloudera.impala.thrift.TFunctionBinaryType;
+import com.cloudera.impala.thrift.TFunctionCategory;
+import com.cloudera.impala.thrift.TSymbolType;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE FUNCTION statement.
+ */
+public class CreateUdfStmt extends CreateFunctionStmtBase {
+ /**
+ * Builds a CREATE FUNCTION statement
+ * @param fnName - Name of the function
+ * @param fnArgs - List of types for the arguments to this function
+ * @param retType - The type this function returns.
+ * @param location - Path in HDFS containing the UDA.
+ * @param ifNotExists - If true, no errors are thrown if the function already exists
+ * @param additionalArgs - Key/Value pairs for additional arguments. The keys are
+ * validated in analyze()
+ */
+ public CreateUdfStmt(FunctionName fnName, FunctionArgs args,
+ TypeDef retTypeDef, HdfsUri location, boolean ifNotExists,
+ HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) {
+ super(fnName, args, retTypeDef, location, ifNotExists, optArgs);
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ super.analyze(analyzer);
+ Preconditions.checkNotNull(fn_);
+ Preconditions.checkNotNull(fn_ instanceof ScalarFunction);
+ ScalarFunction udf = (ScalarFunction) fn_;
+
+ if (hasSignature()) {
+ if (udf.getBinaryType() == TFunctionBinaryType.JAVA) {
+ if (!JavaUdfDataType.isSupported(udf.getReturnType())) {
+ throw new AnalysisException(
+ "Type " + udf.getReturnType().toSql() + " is not supported for Java UDFs.");
+ }
+ for (int i = 0; i < udf.getNumArgs(); ++i) {
+ if (!JavaUdfDataType.isSupported(udf.getArgs()[i])) {
+ throw new AnalysisException(
+ "Type " + udf.getArgs()[i].toSql() + " is not supported for Java UDFs.");
+ }
+ }
+ }
+
+ if (udf.getReturnType().getPrimitiveType() == PrimitiveType.CHAR) {
+ throw new AnalysisException("UDFs that use CHAR are not yet supported.");
+ }
+ if (udf.getReturnType().getPrimitiveType() == PrimitiveType.VARCHAR) {
+ throw new AnalysisException("UDFs that use VARCHAR are not yet supported.");
+ }
+ for (int i = 0; i < udf.getNumArgs(); ++i) {
+ if (udf.getArgs()[i].getPrimitiveType() == PrimitiveType.CHAR) {
+ throw new AnalysisException("UDFs that use CHAR are not yet supported.");
+ }
+ if (udf.getArgs()[i].getPrimitiveType() == PrimitiveType.VARCHAR) {
+ throw new AnalysisException("UDFs that use VARCHAR are not yet supported.");
+ }
+ }
+ }
+
+ // Check the user provided symbol exists
+ udf.setSymbolName(udf.lookupSymbol(
+ checkAndGetOptArg(OptArg.SYMBOL), TSymbolType.UDF_EVALUATE, null,
+ udf.hasVarArgs(), udf.getArgs()));
+
+ // Set optional Prepare/Close functions
+ String prepareFn = optArgs_.get(OptArg.PREPARE_FN);
+ if (prepareFn != null) {
+ udf.setPrepareFnSymbol(udf.lookupSymbol(prepareFn, TSymbolType.UDF_PREPARE));
+ }
+ String closeFn = optArgs_.get(OptArg.CLOSE_FN);
+ if (closeFn != null) {
+ udf.setCloseFnSymbol(udf.lookupSymbol(closeFn, TSymbolType.UDF_CLOSE));
+ }
+
+ // Udfs should not set any of these
+ checkOptArgNotSet(OptArg.UPDATE_FN);
+ checkOptArgNotSet(OptArg.INIT_FN);
+ checkOptArgNotSet(OptArg.SERIALIZE_FN);
+ checkOptArgNotSet(OptArg.MERGE_FN);
+ checkOptArgNotSet(OptArg.FINALIZE_FN);
+
+ sqlString_ = udf.toSql(ifNotExists_);
+
+ // Check that there is no function with the same name and isPersistent field not
+ // the same as udf.isPersistent_. For example we don't allow two JAVA udfs with
+ // same name and opposite persistence values set. This only applies for JAVA udfs
+ // as all the native udfs are persisted. Additionally we don't throw exceptions
+ // if "IF NOT EXISTS" is specified in the query.
+ if (udf.getBinaryType() != TFunctionBinaryType.JAVA || ifNotExists_) return;
+
+ Preconditions.checkNotNull(db_);
+ for (Function fn: db_.getFunctions(udf.functionName())) {
+ if (!hasSignature() || (hasSignature() && fn.isPersistent())) {
+ throw new AnalysisException(
+ String.format(Analyzer.FN_ALREADY_EXISTS_ERROR_MSG +
+ fn.signatureString()));
+ }
+ }
+ }
+
+ @Override
+ protected Function createFunction(FunctionName fnName, ArrayList<Type> argTypes, Type retType,
+ boolean hasVarArgs) {
+ return new ScalarFunction(fnName, argTypes, retType, hasVarArgs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
new file mode 100644
index 0000000..c38eef0
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.RuntimeEnv;
+import com.cloudera.impala.thrift.TAccessEvent;
+import com.cloudera.impala.thrift.TCatalogObjectType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE VIEW statement.
+ */
+public class CreateViewStmt extends CreateOrAlterViewStmtBase {
+
+ public CreateViewStmt(boolean ifNotExists, TableName tableName,
+ ArrayList<ColumnDef> columnDefs, String comment, QueryStmt viewDefStmt) {
+ super(ifNotExists, tableName, columnDefs, comment, viewDefStmt);
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+
+ tableName_.analyze();
+ // Use a child analyzer to let views have complex-typed columns.
+ Analyzer viewAnalyzerr = new Analyzer(analyzer);
+ // Enforce Hive column labels for view compatibility.
+ viewAnalyzerr.setUseHiveColLabels(true);
+ viewDefStmt_.analyze(viewAnalyzerr);
+
+ dbName_ = analyzer.getTargetDbName(tableName_);
+ owner_ = analyzer.getUser().getName();
+ if (analyzer.dbContainsTable(dbName_, tableName_.getTbl(), Privilege.CREATE) &&
+ !ifNotExists_) {
+ throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG +
+ String.format("%s.%s", dbName_, tableName_.getTbl()));
+ }
+ analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(),
+ TCatalogObjectType.VIEW, Privilege.CREATE.toString()));
+
+ createColumnAndViewDefs(analyzer);
+ if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
+ computeLineageGraph(analyzer);
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE VIEW ");
+ if (ifNotExists_) sb.append("IF NOT EXISTS ");
+ if (tableName_.getDb() != null) sb.append(tableName_.getDb() + ".");
+ sb.append(tableName_.getTbl() + " (");
+ sb.append(Joiner.on(", ").join(columnDefs_));
+ sb.append(") AS ");
+ sb.append(viewDefStmt_.toSql());
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
new file mode 100644
index 0000000..efa2117
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.List;
+
+import com.cloudera.impala.common.Pair;
+import com.cloudera.impala.planner.DataSink;
+import com.cloudera.impala.planner.KuduTableSink;
+import com.cloudera.impala.planner.TableSink;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.client.Delete;
+
+/**
+ * Representation of a DELETE statement.
+ *
+ * A delete statement contains three main parts, the target table reference, the from
+ * clause and the optional where clause. Syntactically, this is represented as follows:
+ *
+ * DELETE [FROM] dotted_path [WHERE expr]
+ * DELETE [table_alias] FROM table_ref_list [WHERE expr]
+ *
+ * Only the syntax using the explicit from clause can contain join conditions.
+ */
+public class DeleteStmt extends ModifyStmt {
+
+ public DeleteStmt(List<String> targetTablePath, FromClause tableRefs,
+ Expr wherePredicate, boolean ignoreNotFound) {
+ super(targetTablePath, tableRefs, Lists.<Pair<SlotRef, Expr>>newArrayList(),
+ wherePredicate, ignoreNotFound);
+ }
+
+ public DeleteStmt(DeleteStmt other) {
+ super(other.targetTablePath_, other.fromClause_.clone(),
+ Lists.<Pair<SlotRef, Expr>>newArrayList(), other.wherePredicate_.clone(),
+ other.ignoreNotFound_);
+ }
+
+ public DataSink createDataSink() {
+ // analyze() must have been called before.
+ Preconditions.checkState(table_ != null);
+ TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
+ ImmutableList.<Expr>of(), referencedColumns_, false, ignoreNotFound_);
+ Preconditions.checkState(!referencedColumns_.isEmpty());
+ return tableSink;
+ }
+
+ @Override
+ public DeleteStmt clone() {
+ return new DeleteStmt(this);
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder b = new StringBuilder();
+ b.append("DELETE");
+ if (ignoreNotFound_) b.append(" IGNORE");
+ if (fromClause_.size() > 1 || targetTableRef_.hasExplicitAlias()) {
+ b.append(" ");
+ if (targetTableRef_.hasExplicitAlias()) {
+ b.append(targetTableRef_.getExplicitAlias());
+ } else {
+ b.append(targetTableRef_.toSql());
+ }
+ }
+ b.append(fromClause_.toSql());
+ if (wherePredicate_ != null) {
+ b.append(" WHERE ");
+ b.append(wherePredicate_.toSql());
+ }
+ return b.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
new file mode 100644
index 0000000..0ddd6ec
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TDescribeDbParams;
+import com.cloudera.impala.thrift.TDescribeOutputStyle;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * Represents a DESCRIBE DATABASE statement which returns metadata on
+ * a specified database:
+ * Syntax: DESCRIBE DATABASE [FORMATTED|EXTENDED] <db>
+ *
+ * If FORMATTED|EXTENDED is not specified, the statement only returns the given
+ * database's location and comment.
+ * If FORMATTED|EXTENDED is specified, extended metadata on the database is returned.
+ * This metadata includes info about the database's parameters, owner info
+ * and privileges.
+ */
+public class DescribeDbStmt extends StatementBase {
+ private final TDescribeOutputStyle outputStyle_;
+ private final String dbName_;
+
+ public DescribeDbStmt(String dbName, TDescribeOutputStyle outputStyle) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(dbName), "Invalid database name");
+ dbName_ = dbName;
+ outputStyle_ = outputStyle;
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder("DESCRIBE DATABASE ");
+ if (outputStyle_ != TDescribeOutputStyle.MINIMAL) {
+ sb.append(outputStyle_.toString() + " ");
+ }
+ return sb.toString() + dbName_;
+ }
+
+ public String getDb() { return dbName_; }
+ public TDescribeOutputStyle getOutputStyle() { return outputStyle_; }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ analyzer.getDb(dbName_, Privilege.VIEW_METADATA);
+ }
+
+ public TDescribeDbParams toThrift() {
+ TDescribeDbParams params = new TDescribeDbParams();
+ params.setDb(dbName_);
+ params.setOutput_style(outputStyle_);
+ return params;
+ }
+}