You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:21 UTC

[04/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
new file mode 100644
index 0000000..7b59625
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.List;
+import java.util.EnumSet;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.catalog.HdfsTable;
+import com.cloudera.impala.catalog.KuduTable;
+import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import com.cloudera.impala.catalog.Table;
+import com.cloudera.impala.catalog.TableId;
+import com.cloudera.impala.catalog.TableLoadingException;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.service.CatalogOpExecutor;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents a CREATE TABLE AS SELECT (CTAS) statement
+ *
+ * The statement supports an optional PARTITIONED BY clause. Its syntax and semantics
+ * follow the PARTITION feature of INSERT FROM SELECT statements: inside the PARTITIONED
+ * BY (...) column list the user must specify names of the columns to partition by. These
+ * column names must appear in the specified order at the end of the select statement. A
+ * remapping between columns of the source and destination tables is not possible, because
+ * the destination table does not yet exist. Specifying static values for the partition
+ * columns is also not possible, as their type needs to be deduced from columns in the
+ * select statement.
+ */
+public class CreateTableAsSelectStmt extends StatementBase {
+  private final CreateTableStmt createStmt_;
+
+  // List of partition columns from the PARTITIONED BY (...) clause. Set to null if no
+  // partition was given.
+  private final List<String> partitionKeys_;
+
+  /////////////////////////////////////////
+  // BEGIN: Members that need to be reset()
+
+  private final InsertStmt insertStmt_;
+
+  // END: Members that need to be reset()
+  /////////////////////////////////////////
+
+  private final static EnumSet<THdfsFileFormat> SUPPORTED_INSERT_FORMATS =
+      EnumSet.of(THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT);
+
+  /**
+   * Builds a CREATE TABLE AS SELECT statement
+   */
+  public CreateTableAsSelectStmt(CreateTableStmt createStmt, QueryStmt queryStmt,
+      List<String> partitionKeys) {
+    Preconditions.checkNotNull(queryStmt);
+    Preconditions.checkNotNull(createStmt);
+    createStmt_ = createStmt;
+    partitionKeys_ = partitionKeys;
+    List<PartitionKeyValue> pkvs = null;
+    if (partitionKeys != null) {
+      pkvs = Lists.newArrayList();
+      for (String key: partitionKeys) {
+        pkvs.add(new PartitionKeyValue(key, null));
+      }
+    }
+    insertStmt_ = new InsertStmt(null, createStmt.getTblName(), false, pkvs,
+        null, queryStmt, null, false);
+  }
+
+  public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); }
+  public InsertStmt getInsertStmt() { return insertStmt_; }
+  public CreateTableStmt getCreateStmt() { return createStmt_; }
+  @Override
+  public String toSql() { return ToSqlUtils.getCreateTableSql(this); }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    if (isAnalyzed()) return;
+    super.analyze(analyzer);
+
+    // The analysis for CTAS happens in two phases - the first phase happens before
+    // the target table exists and we want to validate the CREATE statement and the
+    // query portion of the insert statement. If this passes, analysis will be run
+    // over the full INSERT statement. To avoid duplicate registrations of table/colRefs,
+    // create a new root analyzer and clone the query statement for this initial pass.
+    Analyzer dummyRootAnalyzer = new Analyzer(analyzer.getCatalog(),
+        analyzer.getQueryCtx(), analyzer.getAuthzConfig());
+    QueryStmt tmpQueryStmt = insertStmt_.getQueryStmt().clone();
+    try {
+      Analyzer tmpAnalyzer = new Analyzer(dummyRootAnalyzer);
+      tmpAnalyzer.setUseHiveColLabels(true);
+      tmpQueryStmt.analyze(tmpAnalyzer);
+      // Subqueries need to be rewritten by the StmtRewriter first.
+      if (analyzer.containsSubquery()) return;
+    } finally {
+      // Record missing tables in the original analyzer.
+      analyzer.getMissingTbls().addAll(dummyRootAnalyzer.getMissingTbls());
+    }
+
+    // Add the columns from the partition clause to the create statement.
+    if (partitionKeys_ != null) {
+      int colCnt = tmpQueryStmt.getColLabels().size();
+      int partColCnt = partitionKeys_.size();
+      if (partColCnt >= colCnt) {
+        throw new AnalysisException(String.format("Number of partition columns (%s) " +
+            "must be smaller than the number of columns in the select statement (%s).",
+            partColCnt, colCnt));
+      }
+      int firstCol = colCnt - partColCnt;
+      for (int i = firstCol, j = 0; i < colCnt; ++i, ++j) {
+        String partitionLabel = partitionKeys_.get(j);
+        String colLabel = tmpQueryStmt.getColLabels().get(i);
+
+        // Ensure that partition columns are named and positioned at end of
+        // input column list.
+        if (!partitionLabel.equals(colLabel)) {
+          throw new AnalysisException(String.format("Partition column name " +
+              "mismatch: %s != %s", partitionLabel, colLabel));
+        }
+
+        ColumnDef colDef = new ColumnDef(colLabel, null, null);
+        colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
+        createStmt_.getPartitionColumnDefs().add(colDef);
+      }
+      // Remove partition columns from table column list.
+      tmpQueryStmt.getColLabels().subList(firstCol, colCnt).clear();
+    }
+
+    // Add the columns from the select statement to the create statement.
+    int colCnt = tmpQueryStmt.getColLabels().size();
+    createStmt_.getColumnDefs().clear();
+    for (int i = 0; i < colCnt; ++i) {
+      ColumnDef colDef = new ColumnDef(
+          tmpQueryStmt.getColLabels().get(i), null, null);
+      colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
+      createStmt_.getColumnDefs().add(colDef);
+    }
+    createStmt_.analyze(analyzer);
+
+    if (!SUPPORTED_INSERT_FORMATS.contains(createStmt_.getFileFormat())) {
+      throw new AnalysisException(String.format("CREATE TABLE AS SELECT " +
+          "does not support (%s) file format. Supported formats are: (%s)",
+          createStmt_.getFileFormat().toString().replace("_", ""),
+          "PARQUET, TEXTFILE"));
+    }
+
+    // The full privilege check for the database will be done as part of the INSERT
+    // analysis.
+    Db db = analyzer.getDb(createStmt_.getDb(), Privilege.ANY);
+    if (db == null) {
+      throw new AnalysisException(
+          Analyzer.DB_DOES_NOT_EXIST_ERROR_MSG + createStmt_.getDb());
+    }
+
+    // Running analysis on the INSERT portion of the CTAS requires the target INSERT
+    // table to "exist". For CTAS the table does not exist yet, so create a "temp"
+    // table to run analysis against. The schema of this temp table should exactly
+    // match the schema of the table that will be created by running the CREATE
+    // statement.
+    org.apache.hadoop.hive.metastore.api.Table msTbl =
+        CatalogOpExecutor.createMetaStoreTable(createStmt_.toThrift());
+
+    try (MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient()) {
+      // Set a valid location of this table using the same rules as the metastore. If the
+      // user specified a location for the table this will be a no-op.
+      msTbl.getSd().setLocation(analyzer.getCatalog().getTablePath(msTbl).toString());
+
+      // Create a "temp" table based off the given metastore.api.Table object. Normally,
+      // the CatalogService assigns all table IDs, but in this case we need to assign the
+      // "temp" table an ID locally. This table ID cannot conflict with any table in the
+      // SelectStmt (or the BE will be very confused). To ensure the ID is unique within
+      // this query, just assign it the invalid table ID. The CatalogServer will assign
+      // this table a proper ID once it is created there as part of the CTAS execution.
+      Table table = Table.fromMetastoreTable(TableId.createInvalidId(), db, msTbl);
+      Preconditions.checkState(table != null &&
+          (table instanceof HdfsTable || table instanceof KuduTable));
+
+      table.load(true, client.getHiveClient(), msTbl);
+      insertStmt_.setTargetTable(table);
+    } catch (TableLoadingException e) {
+      throw new AnalysisException(e.getMessage(), e);
+    } catch (Exception e) {
+      throw new AnalysisException(e.getMessage(), e);
+    }
+
+    // Finally, run analysis on the insert statement.
+    insertStmt_.analyze(analyzer);
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    insertStmt_.reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
new file mode 100644
index 0000000..0faf881
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_API_VER;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_CLASS;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_DATA_SRC_NAME;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_INIT_STRING;
+import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_LOCATION;
+
+import java.util.List;
+import java.util.Map;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.DataSource;
+import com.cloudera.impala.catalog.DataSourceTable;
+import com.cloudera.impala.catalog.RowFormat;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.permission.FsAction;
+
+/**
+ * Represents a CREATE TABLE statement for external data sources. Such tables
+ * reference an external data source (created with a CREATE DATA SOURCE statement)
+ * and the properties of that source are stored in the table properties because
+ * the metastore does not store the data sources themselves.
+ */
+public class CreateTableDataSrcStmt extends CreateTableStmt {
+
+  public CreateTableDataSrcStmt(TableName tableName, List<ColumnDef> columnDefs,
+      String dataSourceName, String initString, String comment, boolean ifNotExists) {
+    super(tableName, columnDefs, Lists.<ColumnDef>newArrayList(), false, comment,
+        RowFormat.DEFAULT_ROW_FORMAT, THdfsFileFormat.TEXT, null, null, ifNotExists,
+        createInitialTableProperties(dataSourceName, initString),
+        Maps.<String, String>newHashMap(), null);
+  }
+
+  /**
+   * Creates the initial map of table properties containing the name of the data
+   * source and the table init string.
+   */
+  private static Map<String, String> createInitialTableProperties(
+      String dataSourceName, String initString) {
+    Preconditions.checkNotNull(dataSourceName);
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(TBL_PROP_DATA_SRC_NAME, dataSourceName.toLowerCase());
+    tableProperties.put(TBL_PROP_INIT_STRING, Strings.nullToEmpty(initString));
+    return tableProperties;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    String dataSourceName = getTblProperties().get(TBL_PROP_DATA_SRC_NAME);
+    DataSource dataSource = analyzer.getCatalog().getDataSource(dataSourceName);
+    if (dataSource == null) {
+      throw new AnalysisException("Data source does not exist: " + dataSourceName);
+    }
+
+    for (ColumnDef col: getColumnDefs()) {
+      if (!DataSourceTable.isSupportedColumnType(col.getType())) {
+        throw new AnalysisException("Tables produced by an external data source do " +
+            "not support the column type: " + col.getType());
+      }
+    }
+    // Add table properties from the DataSource catalog object now that we have access
+    // to the catalog. These are stored in the table metadata because DataSource catalog
+    // objects are not currently persisted.
+    String location = dataSource.getLocation();
+    getTblProperties().put(TBL_PROP_LOCATION, location);
+    getTblProperties().put(TBL_PROP_CLASS, dataSource.getClassName());
+    getTblProperties().put(TBL_PROP_API_VER, dataSource.getApiVersion());
+    new HdfsUri(location).analyze(analyzer, Privilege.ALL, FsAction.READ);
+    // TODO: check class exists and implements API version
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
new file mode 100644
index 0000000..6695cac
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.ArrayType;
+import com.cloudera.impala.catalog.HdfsCompression;
+import com.cloudera.impala.catalog.HdfsFileFormat;
+import com.cloudera.impala.catalog.MapType;
+import com.cloudera.impala.catalog.RowFormat;
+import com.cloudera.impala.catalog.ScalarType;
+import com.cloudera.impala.catalog.StructField;
+import com.cloudera.impala.catalog.StructType;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.FileSystemUtil;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+
+/**
+ * Represents a CREATE TABLE tablename LIKE fileformat '/path/to/file' statement
+ * where the schema is inferred from the given file. Does not partition the table by
+ * default.
+ */
+public class CreateTableLikeFileStmt extends CreateTableStmt {
+  private final HdfsUri schemaLocation_;
+  private final THdfsFileFormat schemaFileFormat_;
+  private final static String ERROR_MSG =
+      "Failed to convert Parquet type\n%s\nto an Impala %s type:\n%s\n";
+
+  public CreateTableLikeFileStmt(TableName tableName, THdfsFileFormat schemaFileFormat,
+      HdfsUri schemaLocation, List<ColumnDef> partitionColumnDescs,
+      boolean isExternal, String comment, RowFormat rowFormat,
+      THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp,
+      boolean ifNotExists, Map<String, String> tblProperties,
+      Map<String, String> serdeProperties) {
+    super(tableName, new ArrayList<ColumnDef>(), partitionColumnDescs,
+        isExternal, comment, rowFormat,
+        fileFormat, location, cachingOp, ifNotExists, tblProperties, serdeProperties,
+        null);
+    schemaLocation_ = schemaLocation;
+    schemaFileFormat_ = schemaFileFormat;
+  }
+
+  /**
+   * Reads the first block from the given HDFS file and returns the Parquet schema.
+   * Throws Analysis exception for any failure, such as failing to read the file
+   * or failing to parse the contents.
+   */
+  private static parquet.schema.MessageType loadParquetSchema(Path pathToFile)
+      throws AnalysisException {
+    try {
+      FileSystem fs = pathToFile.getFileSystem(FileSystemUtil.getConfiguration());
+      if (!fs.isFile(pathToFile)) {
+        throw new AnalysisException("Cannot infer schema, path is not a file: " +
+                                    pathToFile);
+      }
+    } catch (IOException e) {
+      throw new AnalysisException("Failed to connect to filesystem:" + e);
+    } catch (IllegalArgumentException e) {
+      throw new AnalysisException(e.getMessage());
+    }
+    ParquetMetadata readFooter = null;
+    try {
+      readFooter = ParquetFileReader.readFooter(FileSystemUtil.getConfiguration(),
+          pathToFile);
+    } catch (FileNotFoundException e) {
+      throw new AnalysisException("File not found: " + e);
+    } catch (IOException e) {
+      throw new AnalysisException("Failed to open file as a parquet file: " + e);
+    } catch (RuntimeException e) {
+      // Parquet throws a generic RuntimeException when reading a non-parquet file
+      if (e.toString().contains("is not a Parquet file")) {
+        throw new AnalysisException("File is not a parquet file: " + pathToFile);
+      }
+      // otherwise, who knows what we caught, throw it back up
+      throw e;
+    }
+     return readFooter.getFileMetaData().getSchema();
+  }
+
+  /**
+   * Converts a "primitive" Parquet type to an Impala type.
+   * A primitive type is a non-nested type with no annotations.
+   */
+  private static Type convertPrimitiveParquetType(parquet.schema.Type parquetType)
+      throws AnalysisException {
+    Preconditions.checkState(parquetType.isPrimitive());
+    PrimitiveType prim = parquetType.asPrimitiveType();
+    switch (prim.getPrimitiveTypeName()) {
+      case BINARY: return Type.STRING;
+      case BOOLEAN: return Type.BOOLEAN;
+      case DOUBLE: return Type.DOUBLE;
+      case FIXED_LEN_BYTE_ARRAY:
+        throw new AnalysisException(
+            "Unsupported parquet type FIXED_LEN_BYTE_ARRAY for field " +
+                parquetType.getName());
+      case FLOAT: return Type.FLOAT;
+      case INT32: return Type.INT;
+      case INT64: return Type.BIGINT;
+      case INT96: return Type.TIMESTAMP;
+      default:
+        Preconditions.checkState(false, "Unexpected parquet primitive type: " +
+               prim.getPrimitiveTypeName());
+        return null;
+    }
+  }
+
+  /**
+   * Converts a Parquet group type to an Impala map Type. We support both standard
+   * Parquet map representations, as well as legacy. Legacy representations are handled
+   * according to this specification:
+   * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
+   *
+   * Standard representation of a map in Parquet:
+   * <optional | required> group <name> (MAP) { <-- outerGroup is pointing at this
+   * repeated group key_value {
+   *     required <key-type> key;
+   *     <optional | required> <value-type> value;
+   *   }
+   * }
+   */
+  private static MapType convertMap(parquet.schema.GroupType outerGroup)
+      throws AnalysisException {
+    if (outerGroup.getFieldCount() != 1){
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "MAP", "The logical MAP type must have exactly 1 inner field."));
+    }
+
+    parquet.schema.Type innerField = outerGroup.getType(0);
+    if (!innerField.isRepetition(parquet.schema.Type.Repetition.REPEATED)){
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "MAP", "The logical MAP type must have a repeated inner field."));
+    }
+    if (innerField.isPrimitive()) {
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "MAP", "The inner field of the logical MAP type must be a group."));
+    }
+
+    parquet.schema.GroupType innerGroup = innerField.asGroupType();
+    // It does not matter whether innerGroup has an annotation or not (for example it may
+    // be annotated with MAP_KEY_VALUE). We treat the case that innerGroup has an
+    // annotation and the case the innerGroup does not have an annotation the same.
+    if (innerGroup.getFieldCount() != 2) {
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "MAP", "The inner field of the logical MAP type must have exactly 2 fields."));
+    }
+
+    parquet.schema.Type key = innerGroup.getType(0);
+    if (!key.getName().equals("key")) {
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "MAP", "The name of the first field of the inner field of the logical MAP " +
+          "type must be 'key'"));
+    }
+    if (!key.isPrimitive()) {
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "MAP", "The key type of the logical MAP type must be primitive."));
+    }
+    parquet.schema.Type value = innerGroup.getType(1);
+    if (!value.getName().equals("value")) {
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "MAP", "The name of the second field of the inner field of the logical MAP " +
+          "type must be 'value'"));
+    }
+
+    return new MapType(convertParquetType(key), convertParquetType(value));
+  }
+
+  /**
+   * Converts a Parquet group type to an Impala struct Type.
+   */
+  private static StructType convertStruct(parquet.schema.GroupType outerGroup)
+      throws AnalysisException {
+    ArrayList<StructField> structFields = new ArrayList<StructField>();
+    for (parquet.schema.Type field: outerGroup.getFields()) {
+      StructField f = new StructField(field.getName(), convertParquetType(field));
+      structFields.add(f);
+    }
+    return new StructType(structFields);
+  }
+
+  /**
+   * Converts a Parquet group type to an Impala array Type. We can handle the standard
+   * representation, but also legacy representations for backwards compatibility.
+   * Legacy representations are handled according to this specification:
+   * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+   *
+   * Standard representation of an array in Parquet:
+   * <optional | required> group <name> (LIST) { <-- outerGroup is pointing at this
+   *   repeated group list {
+   *     <optional | required> <element-type> element;
+   *   }
+   * }
+   */
+  private static ArrayType convertArray(parquet.schema.GroupType outerGroup)
+      throws AnalysisException {
+    if (outerGroup.getFieldCount() != 1) {
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "LIST", "The logical LIST type must have exactly 1 inner field."));
+    }
+
+    parquet.schema.Type innerField = outerGroup.getType(0);
+    if (!innerField.isRepetition(parquet.schema.Type.Repetition.REPEATED)) {
+      throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
+          "LIST", "The inner field of the logical LIST type must be repeated."));
+    }
+    if (innerField.isPrimitive() || innerField.getOriginalType() != null) {
+      // From the Parquet Spec:
+      // 1. If the repeated field is not a group then it's type is the element type.
+      //
+      // If innerField is a group, but originalType is not null, the element type is
+      // based on the logical type.
+      return new ArrayType(convertParquetType(innerField));
+    }
+
+    parquet.schema.GroupType innerGroup = innerField.asGroupType();
+    if (innerGroup.getFieldCount() != 1) {
+      // From the Parquet Spec:
+      // 2. If the repeated field is a group with multiple fields, then it's type is a
+      //    struct.
+      return new ArrayType(convertStruct(innerGroup));
+    }
+
+    return new ArrayType(convertParquetType(innerGroup.getType(0)));
+  }
+
+  /**
+   * Converts a "logical" Parquet type to an Impala column type.
+   * A Parquet type is considered logical when it has an annotation. The annotation is
+   * stored as a "OriginalType". The Parquet documentation refers to these as logical
+   * types, so we use that terminology here.
+   */
+  private static Type convertLogicalParquetType(parquet.schema.Type parquetType)
+      throws AnalysisException {
+    OriginalType orig = parquetType.getOriginalType();
+    if (orig == OriginalType.LIST) {
+      return convertArray(parquetType.asGroupType());
+    }
+    if (orig == OriginalType.MAP || orig == OriginalType.MAP_KEY_VALUE) {
+      // MAP_KEY_VALUE annotation should not be used any more. However, according to the
+      // Parquet spec, some existing data incorrectly uses MAP_KEY_VALUE in place of MAP.
+      // For backward-compatibility, a group annotated with MAP_KEY_VALUE that is not
+      // contained by a MAP-annotated group should be handled as a MAP-annotated group.
+      return convertMap(parquetType.asGroupType());
+    }
+
+    PrimitiveType prim = parquetType.asPrimitiveType();
+    if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY &&
+        orig == OriginalType.UTF8) {
+      // UTF8 is the type annotation Parquet uses for strings
+      // We check to make sure it applies to BINARY to avoid errors if there is a bad
+      // annotation.
+      return Type.STRING;
+    }
+
+    if (orig == OriginalType.DECIMAL) {
+      return ScalarType.createDecimalType(prim.getDecimalMetadata().getPrecision(),
+                                           prim.getDecimalMetadata().getScale());
+    }
+
+    throw new AnalysisException(
+        "Unsupported logical parquet type " + orig + " (primitive type is " +
+            prim.getPrimitiveTypeName().name() + ") for field " +
+            parquetType.getName());
+  }
+
+  /**
+   * Converts a Parquet type into an Impala type.
+   */
+  private static Type convertParquetType(parquet.schema.Type field)
+      throws AnalysisException {
+    Type type = null;
+    // TODO for 2.3: If a field is not annotated with LIST, it can still be sometimes
+    // interpreted as an array. The following 2 examples should be interpreted as an array
+    // of integers, but this is currently not done.
+    // 1. repeated int int_col;
+    // 2. required group int_arr {
+    //      repeated group list {
+    //        required int element;
+    //      }
+    //    }
+    if (field.getOriginalType() != null) {
+      type = convertLogicalParquetType(field);
+    } else if (field.isPrimitive()) {
+      type = convertPrimitiveParquetType(field);
+    } else {
+      // If field is not primitive, it must be a struct.
+      type = convertStruct(field.asGroupType());
+    }
+    return type;
+  }
+
+  /**
+   * Parses a Parquet file stored in HDFS and returns the corresponding Impala schema.
+   * This fails with an analysis exception if any errors occur reading the file,
+   * parsing the Parquet schema, or if the Parquet types cannot be represented in Impala.
+   */
+  private static List<ColumnDef> extractParquetSchema(HdfsUri location)
+      throws AnalysisException {
+    parquet.schema.MessageType parquetSchema = loadParquetSchema(location.getPath());
+    List<parquet.schema.Type> fields = parquetSchema.getFields();
+    List<ColumnDef> schema = new ArrayList<ColumnDef>();
+
+    for (parquet.schema.Type field: fields) {
+      Type type = convertParquetType(field);
+      Preconditions.checkNotNull(type);
+      String colName = field.getName();
+      schema.add(new ColumnDef(colName, new TypeDef(type),
+          "Inferred from Parquet file."));
+    }
+    return schema;
+  }
+
+  @Override
+  public String toSql() {
+    ArrayList<String> colsSql = Lists.newArrayList();
+    ArrayList<String> partitionColsSql = Lists.newArrayList();
+    HdfsCompression compression = HdfsCompression.fromFileName(
+        schemaLocation_.toString());
+    String s = ToSqlUtils.getCreateTableSql(getDb(),
+        getTbl() + " __LIKE_FILEFORMAT__ ",  getComment(), colsSql, partitionColsSql,
+        getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
+        getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()),
+        compression, null, getLocation());
+    s = s.replace("__LIKE_FILEFORMAT__", "LIKE " + schemaFileFormat_ + " " +
+        schemaLocation_.toString());
+    return s;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    schemaLocation_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+    switch (schemaFileFormat_) {
+      case PARQUET:
+        getColumnDefs().addAll(extractParquetSchema(schemaLocation_));
+        break;
+      default:
+        throw new AnalysisException("Unsupported file type for schema inference: "
+            + schemaFileFormat_);
+    }
+    super.analyze(analyzer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
new file mode 100644
index 0000000..a7e2038
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import org.apache.hadoop.fs.permission.FsAction;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TAccessEvent;
+import com.cloudera.impala.thrift.TCatalogObjectType;
+import com.cloudera.impala.thrift.TCreateTableLikeParams;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.cloudera.impala.thrift.TTableName;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE TABLE LIKE statement which creates a new table based on
+ * a copy of an existing table definition.
+ */
+public class CreateTableLikeStmt extends StatementBase {
+  private final TableName tableName_;
+  private final TableName srcTableName_;
+  private final boolean isExternal_;
+  private final String comment_;
+  private final THdfsFileFormat fileFormat_;
+  private final HdfsUri location_;
+  private final boolean ifNotExists_;
+
+  // Set during analysis
+  private String dbName_;
+  private String srcDbName_;
+  private String owner_;
+
+  /**
+   * Builds a CREATE TABLE LIKE statement
+   * @param tableName - Name of the new table
+   * @param srcTableName - Name of the source table (table to copy)
+   * @param isExternal - If true, the table's data will be preserved if dropped.
+   * @param comment - Comment to attach to the table
+   * @param fileFormat - File format of the table
+   * @param location - The HDFS location of where the table data will stored.
+   * @param ifNotExists - If true, no errors are thrown if the table already exists
+   */
+  public CreateTableLikeStmt(TableName tableName, TableName srcTableName,
+      boolean isExternal, String comment, THdfsFileFormat fileFormat, HdfsUri location,
+      boolean ifNotExists) {
+    Preconditions.checkNotNull(tableName);
+    Preconditions.checkNotNull(srcTableName);
+    this.tableName_ = tableName;
+    this.srcTableName_ = srcTableName;
+    this.isExternal_ = isExternal;
+    this.comment_ = comment;
+    this.fileFormat_ = fileFormat;
+    this.location_ = location;
+    this.ifNotExists_ = ifNotExists;
+  }
+
+  public String getTbl() { return tableName_.getTbl(); }
+  public String getSrcTbl() { return srcTableName_.getTbl(); }
+  public boolean isExternal() { return isExternal_; }
+  public boolean getIfNotExists() { return ifNotExists_; }
+  public String getComment() { return comment_; }
+  public THdfsFileFormat getFileFormat() { return fileFormat_; }
+  public HdfsUri getLocation() { return location_; }
+
+  /**
+   * Can only be called after analysis, returns the name of the database the table will
+   * be created within.
+   */
+  public String getDb() {
+    Preconditions.checkNotNull(dbName_);
+    return dbName_;
+  }
+
+  /**
+   * Can only be called after analysis, returns the name of the database the table will
+   * be created within.
+   */
+  public String getSrcDb() {
+    Preconditions.checkNotNull(srcDbName_);
+    return srcDbName_;
+  }
+
+  public String getOwner() {
+    Preconditions.checkNotNull(owner_);
+    return owner_;
+  }
+
+  @Override
+  public String toSql() {
+    StringBuilder sb = new StringBuilder("CREATE ");
+    if (isExternal_) sb.append("EXTERNAL ");
+    sb.append("TABLE ");
+    if (ifNotExists_) sb.append("IF NOT EXISTS ");
+    if (tableName_.getDb() != null) sb.append(tableName_.getDb() + ".");
+    sb.append(tableName_.getTbl() + " LIKE ");
+    if (srcTableName_.getDb() != null) sb.append(srcTableName_.getDb() + ".");
+    sb.append(srcTableName_.getTbl());
+    if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'");
+    if (fileFormat_ != null) sb.append(" STORED AS " + fileFormat_);
+    if (location_ != null) sb.append(" LOCATION '" + location_ + "'");
+    return sb.toString();
+  }
+
+  public TCreateTableLikeParams toThrift() {
+    TCreateTableLikeParams params = new TCreateTableLikeParams();
+    params.setTable_name(new TTableName(getDb(), getTbl()));
+    params.setSrc_table_name(new TTableName(getSrcDb(), getSrcTbl()));
+    params.setOwner(getOwner());
+    params.setIs_external(isExternal());
+    params.setComment(comment_);
+    if (fileFormat_ != null) params.setFile_format(fileFormat_);
+    params.setLocation(location_ == null ? null : location_.toString());
+    params.setIf_not_exists(getIfNotExists());
+    return params;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+    Preconditions.checkState(srcTableName_ != null && !srcTableName_.isEmpty());
+    // Make sure the source table exists and the user has permission to access it.
+    srcDbName_ = analyzer
+        .getTable(srcTableName_, Privilege.VIEW_METADATA)
+        .getDb().getName();
+    tableName_.analyze();
+    dbName_ = analyzer.getTargetDbName(tableName_);
+    owner_ = analyzer.getUser().getName();
+
+    if (analyzer.dbContainsTable(dbName_, tableName_.getTbl(), Privilege.CREATE) &&
+        !ifNotExists_) {
+      throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG +
+          String.format("%s.%s", dbName_, getTbl()));
+    }
+    analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(),
+        TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
+
+    if (location_ != null) {
+      location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
new file mode 100644
index 0000000..f7b683f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.hadoop.fs.permission.FsAction;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.catalog.HdfsStorageDescriptor;
+import com.cloudera.impala.catalog.KuduTable;
+import com.cloudera.impala.catalog.RowFormat;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.FileSystemUtil;
+import com.cloudera.impala.thrift.TAccessEvent;
+import com.cloudera.impala.thrift.TCatalogObjectType;
+import com.cloudera.impala.thrift.TCreateTableParams;
+import com.cloudera.impala.thrift.THdfsFileFormat;
+import com.cloudera.impala.thrift.TTableName;
+import com.cloudera.impala.util.AvroSchemaConverter;
+import com.cloudera.impala.util.AvroSchemaParser;
+import com.cloudera.impala.util.AvroSchemaUtils;
+import com.cloudera.impala.util.KuduUtil;
+import com.cloudera.impala.util.MetaStoreUtil;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Represents a CREATE TABLE statement.
+ */
+public class CreateTableStmt extends StatementBase {
+  private List<ColumnDef> columnDefs_;
+  private final String comment_;
+  private final boolean isExternal_;
+  private final boolean ifNotExists_;
+  private final THdfsFileFormat fileFormat_;
+  private final ArrayList<ColumnDef> partitionColDefs_;
+  private final RowFormat rowFormat_;
+  private TableName tableName_;
+  private final Map<String, String> tblProperties_;
+  private final Map<String, String> serdeProperties_;
+  private final HdfsCachingOp cachingOp_;
+  private HdfsUri location_;
+  private final List<DistributeParam> distributeParams_;
+
+  // Set during analysis
+  private String owner_;
+
+  /**
+   * Builds a CREATE TABLE statement
+   * @param tableName - Name of the new table
+   * @param columnDefs - List of column definitions for the table
+   * @param partitionColumnDefs - List of partition column definitions for the table
+   * @param isExternal - If true, the table's data will be preserved if dropped.
+   * @param comment - Comment to attach to the table
+   * @param rowFormat - Custom row format of the table. Use RowFormat.DEFAULT_ROW_FORMAT
+   *          to specify default row format.
+   * @param fileFormat - File format of the table
+   * @param location - The HDFS location of where the table data will stored.
+   * @param cachingOp - The HDFS caching op that should be applied to this table.
+   * @param ifNotExists - If true, no errors are thrown if the table already exists.
+   * @param tblProperties - Optional map of key/values to persist with table metadata.
+   * @param serdeProperties - Optional map of key/values to persist with table serde
+   *                          metadata.
+   */
+  public CreateTableStmt(TableName tableName, List<ColumnDef> columnDefs,
+      List<ColumnDef> partitionColumnDefs, boolean isExternal, String comment,
+      RowFormat rowFormat, THdfsFileFormat fileFormat, HdfsUri location,
+      HdfsCachingOp cachingOp, boolean ifNotExists, Map<String, String> tblProperties,
+      Map<String, String> serdeProperties, List<DistributeParam> distributeParams) {
+    Preconditions.checkNotNull(columnDefs);
+    Preconditions.checkNotNull(partitionColumnDefs);
+    Preconditions.checkNotNull(fileFormat);
+    Preconditions.checkNotNull(rowFormat);
+    Preconditions.checkNotNull(tableName);
+
+    columnDefs_ = Lists.newArrayList(columnDefs);
+    comment_ = comment;
+    isExternal_ = isExternal;
+    ifNotExists_ = ifNotExists;
+    fileFormat_ = fileFormat;
+    location_ = location;
+    cachingOp_ = cachingOp;
+    partitionColDefs_ = Lists.newArrayList(partitionColumnDefs);
+    rowFormat_ = rowFormat;
+    tableName_ = tableName;
+    tblProperties_ = tblProperties;
+    serdeProperties_ = serdeProperties;
+    unescapeProperties(tblProperties_);
+    unescapeProperties(serdeProperties_);
+    distributeParams_ = distributeParams;
+  }
+
+  /**
+   * Copy c'tor.
+   */
+  public CreateTableStmt(CreateTableStmt other) {
+    columnDefs_ = Lists.newArrayList(other.columnDefs_);
+    comment_ = other.comment_;
+    isExternal_ = other.isExternal_;
+    ifNotExists_ = other.ifNotExists_;
+    fileFormat_ = other.fileFormat_;
+    location_ = other.location_;
+    cachingOp_ = other.cachingOp_;
+    partitionColDefs_ = Lists.newArrayList(other.partitionColDefs_);
+    rowFormat_ = other.rowFormat_;
+    tableName_ = other.tableName_;
+    tblProperties_ = other.tblProperties_;
+    serdeProperties_ = other.serdeProperties_;
+    distributeParams_ = other.distributeParams_;
+  }
+
+  @Override
+  public CreateTableStmt clone() { return new CreateTableStmt(this); }
+
+  public String getTbl() { return tableName_.getTbl(); }
+  public TableName getTblName() { return tableName_; }
+  public List<ColumnDef> getColumnDefs() { return columnDefs_; }
+  public List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
+  public String getComment() { return comment_; }
+  public boolean isExternal() { return isExternal_; }
+  public boolean getIfNotExists() { return ifNotExists_; }
+  public HdfsUri getLocation() { return location_; }
+  public void setLocation(HdfsUri location) { this.location_ = location; }
+  public THdfsFileFormat getFileFormat() { return fileFormat_; }
+  public RowFormat getRowFormat() { return rowFormat_; }
+  public Map<String, String> getTblProperties() { return tblProperties_; }
+  public Map<String, String> getSerdeProperties() { return serdeProperties_; }
+
+  /**
+   * Can only be called after analysis, returns the owner of this table (the user from
+   * the current session).
+   */
+  public String getOwner() {
+    Preconditions.checkNotNull(owner_);
+    return owner_;
+  }
+
+  /**
+   * Can only be called after analysis, returns the name of the database the table will
+   * be created within.
+   */
+  public String getDb() {
+    Preconditions.checkState(isAnalyzed());
+    return tableName_.getDb();
+  }
+
+  @Override
+  public String toSql() { return ToSqlUtils.getCreateTableSql(this); }
+
+  public TCreateTableParams toThrift() {
+    TCreateTableParams params = new TCreateTableParams();
+    params.setTable_name(new TTableName(getDb(), getTbl()));
+    for (ColumnDef col: getColumnDefs()) {
+      params.addToColumns(col.toThrift());
+    }
+    for (ColumnDef col: getPartitionColumnDefs()) {
+      params.addToPartition_columns(col.toThrift());
+    }
+    params.setOwner(getOwner());
+    params.setIs_external(isExternal());
+    params.setComment(comment_);
+    params.setLocation(location_ == null ? null : location_.toString());
+    if (cachingOp_ != null) params.setCache_op(cachingOp_.toThrift());
+    params.setRow_format(rowFormat_.toThrift());
+    params.setFile_format(fileFormat_);
+    params.setIf_not_exists(getIfNotExists());
+    if (tblProperties_ != null) params.setTable_properties(tblProperties_);
+    if (serdeProperties_ != null) params.setSerde_properties(serdeProperties_);
+    if (distributeParams_ != null) {
+      for (DistributeParam d : distributeParams_) {
+        params.addToDistribute_by(d.toThrift());
+      }
+    }
+    return params;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+    tableName_ = analyzer.getFqTableName(tableName_);
+    tableName_.analyze();
+    owner_ = analyzer.getUser().getName();
+
+    MetaStoreUtil.checkShortPropertyMap("Property", tblProperties_);
+    MetaStoreUtil.checkShortPropertyMap("Serde property", serdeProperties_);
+
+    if (analyzer.dbContainsTable(tableName_.getDb(), tableName_.getTbl(),
+        Privilege.CREATE) && !ifNotExists_) {
+      throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + tableName_);
+    }
+
+    analyzer.addAccessEvent(new TAccessEvent(tableName_.toString(),
+        TCatalogObjectType.TABLE, Privilege.CREATE.toString()));
+
+    // Only Avro tables can have empty column defs because they can infer them from
+    // the Avro schema.
+    if (columnDefs_.isEmpty() && fileFormat_ != THdfsFileFormat.AVRO) {
+      throw new AnalysisException("Table requires at least 1 column");
+    }
+
+    if (location_ != null) {
+      location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+    }
+
+    analyzeRowFormat(analyzer);
+
+    // Check that all the column names are valid and unique.
+    analyzeColumnDefs(analyzer);
+
+    if (getTblProperties() != null && KuduTable.KUDU_STORAGE_HANDLER.equals(
+        getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
+      analyzeKuduTable(analyzer);
+    } else if (distributeParams_ != null) {
+      throw new AnalysisException("Only Kudu tables can use DISTRIBUTE BY clause.");
+    }
+
+    if (fileFormat_ == THdfsFileFormat.AVRO) {
+      columnDefs_ = analyzeAvroSchema(analyzer);
+      if (columnDefs_.isEmpty()) {
+        throw new AnalysisException(
+            "An Avro table requires column definitions or an Avro schema.");
+      }
+      AvroSchemaUtils.setFromSerdeComment(columnDefs_);
+      analyzeColumnDefs(analyzer);
+    }
+
+    if (cachingOp_ != null) {
+      cachingOp_.analyze(analyzer);
+      if (cachingOp_.shouldCache() && location_ != null &&
+          !FileSystemUtil.isPathCacheable(location_.getPath())) {
+        throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
+            "Please retry without caching: CREATE TABLE %s ... UNCACHED",
+            location_.toString(), tableName_));
+      }
+    }
+
+    // Analyze 'skip.header.line.format' property.
+    if (tblProperties_ != null) {
+      AlterTableSetTblProperties.analyzeSkipHeaderLineCount(tblProperties_);
+    }
+  }
+
+  private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
+    Byte fieldDelim = analyzeRowFormatValue(rowFormat_.getFieldDelimiter());
+    Byte lineDelim = analyzeRowFormatValue(rowFormat_.getLineDelimiter());
+    Byte escapeChar = analyzeRowFormatValue(rowFormat_.getEscapeChar());
+    if (fileFormat_ == THdfsFileFormat.TEXT) {
+      if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM;
+      if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM;
+      if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR;
+      if (fieldDelim != null && lineDelim != null && fieldDelim.equals(lineDelim)) {
+        throw new AnalysisException("Field delimiter and line delimiter have same " +
+            "value: byte " + fieldDelim);
+      }
+      if (fieldDelim != null && escapeChar != null && fieldDelim.equals(escapeChar)) {
+        analyzer.addWarning("Field delimiter and escape character have same value: " +
+            "byte " + fieldDelim + ". Escape character will be ignored");
+      }
+      if (lineDelim != null && escapeChar != null && lineDelim.equals(escapeChar)) {
+        analyzer.addWarning("Line delimiter and escape character have same value: " +
+            "byte " + lineDelim + ". Escape character will be ignored");
+      }
+    }
+  }
+
+  /**
+   * Analyzes columnDefs_ and partitionColDefs_ checking whether all column
+   * names are unique.
+   */
+  private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException {
+    Set<String> colNames = Sets.newHashSet();
+    for (ColumnDef colDef: columnDefs_) {
+      colDef.analyze();
+      if (!colNames.add(colDef.getColName().toLowerCase())) {
+        throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+      }
+    }
+    for (ColumnDef colDef: partitionColDefs_) {
+      colDef.analyze();
+      if (!colDef.getType().supportsTablePartitioning()) {
+        throw new AnalysisException(
+            String.format("Type '%s' is not supported as partition-column type " +
+                "in column: %s", colDef.getType().toSql(), colDef.getColName()));
+      }
+      if (!colNames.add(colDef.getColName().toLowerCase())) {
+        throw new AnalysisException("Duplicate column name: " + colDef.getColName());
+      }
+    }
+  }
+
+  /**
+   * Analyzes the Avro schema and compares it with the columnDefs_ to detect
+   * inconsistencies. Returns a list of column descriptors that should be
+   * used for creating the table (possibly identical to columnDefs_).
+   */
+  private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer)
+      throws AnalysisException {
+    Preconditions.checkState(fileFormat_ == THdfsFileFormat.AVRO);
+    // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter
+    // taking precedence.
+    List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
+    schemaSearchLocations.add(serdeProperties_);
+    schemaSearchLocations.add(tblProperties_);
+    String avroSchema = null;
+    List<ColumnDef> avroCols = null; // parsed from avroSchema
+    try {
+      avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
+      if (avroSchema == null) {
+        // No Avro schema was explicitly set in the serde or table properties, so infer
+        // the Avro schema from the column definitions.
+        Schema inferredSchema = AvroSchemaConverter.convertColumnDefs(
+            columnDefs_, tableName_.toString());
+        avroSchema = inferredSchema.toString();
+      }
+      if (Strings.isNullOrEmpty(avroSchema)) {
+        throw new AnalysisException("Avro schema is null or empty: " +
+            tableName_.toString());
+      }
+      avroCols = AvroSchemaParser.parse(avroSchema);
+    } catch (SchemaParseException e) {
+      throw new AnalysisException(String.format(
+          "Error parsing Avro schema for table '%s': %s", tableName_.toString(),
+          e.getMessage()));
+    }
+    Preconditions.checkNotNull(avroCols);
+
+    // Analyze the Avro schema to detect inconsistencies with the columnDefs_.
+    // In case of inconsistencies, the column defs are ignored in favor of the Avro
+    // schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104).
+    StringBuilder warning = new StringBuilder();
+    List<ColumnDef> reconciledColDefs =
+        AvroSchemaUtils.reconcileSchemas(columnDefs_, avroCols, warning);
+    if (warning.length() > 0) analyzer.addWarning(warning.toString());
+    return reconciledColDefs;
+  }
+
+  private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException {
+    // Validate that Kudu table is correctly specified.
+    if (!KuduTable.tableParamsAreValid(getTblProperties())) {
+      throw new AnalysisException("Kudu table is missing parameters " +
+          String.format("in table properties. Please verify if %s, %s, and %s are "
+                  + "present and have valid values.",
+              KuduTable.KEY_TABLE_NAME, KuduTable.KEY_MASTER_ADDRESSES,
+              KuduTable.KEY_KEY_COLUMNS));
+    }
+
+    // Kudu table cannot be a cached table
+    if (cachingOp_ != null) {
+      throw new AnalysisException("A Kudu table cannot be cached in HDFS.");
+    }
+
+    if (distributeParams_ != null) {
+      if (isExternal_) {
+        throw new AnalysisException(
+            "The DISTRIBUTE BY clause may not be specified for external tables.");
+      }
+
+      List<String> keyColumns = KuduUtil.parseKeyColumnsAsList(
+          getTblProperties().get(KuduTable.KEY_KEY_COLUMNS));
+      for (DistributeParam d : distributeParams_) {
+        // If the columns are not set, default to all key columns
+        if (d.getColumns() == null) d.setColumns(keyColumns);
+        d.analyze(analyzer);
+      }
+    } else if (!isExternal_) {
+      throw new AnalysisException(
+          "A data distribution must be specified using the DISTRIBUTE BY clause.");
+    }
+  }
+
+  private Byte analyzeRowFormatValue(String value) throws AnalysisException {
+    if (value == null) return null;
+    Byte byteVal = HdfsStorageDescriptor.parseDelim(value);
+    if (byteVal == null) {
+      throw new AnalysisException("ESCAPED BY values and LINE/FIELD " +
+          "terminators must be specified as a single character or as a decimal " +
+          "value in the range [-128:127]: " + value);
+    }
+    return byteVal;
+  }
+
+  /**
+   * Unescapes all values in the property map.
+   */
+  public static void unescapeProperties(Map<String, String> propertyMap) {
+    if (propertyMap == null) return;
+    for (Map.Entry<String, String> kv : propertyMap.entrySet()) {
+      propertyMap.put(kv.getKey(),
+          new StringLiteral(kv.getValue()).getUnescapedValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java
new file mode 100644
index 0000000..46b0003
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import com.cloudera.impala.catalog.AggregateFunction;
+import com.cloudera.impala.catalog.Function;
+import com.cloudera.impala.catalog.PrimitiveType;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TFunctionBinaryType;
+import com.cloudera.impala.thrift.TSymbolType;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE AGGREGATE FUNCTION statement.
+ */
+public class CreateUdaStmt extends CreateFunctionStmtBase {
+  private final TypeDef intermediateTypeDef_;
+
+  /**
+   * Builds a CREATE AGGREGATE FUNCTION statement
+   * @param fnName - Name of the function
+   * @param fnArgs - List of types for the arguments to this function
+   * @param retType - The type this function returns.
+   * @param intermediateType_- The type used for the intermediate data.
+   * @param location - Path in HDFS containing the UDA.
+   * @param ifNotExists - If true, no errors are thrown if the function already exists
+   * @param additionalArgs - Key/Value pairs for additional arguments. The keys are
+   *        validated in analyze()
+   */
+  public CreateUdaStmt(FunctionName fnSymbol, FunctionArgs args,
+      TypeDef retTypeDef, TypeDef intermediateTypeDef,
+      HdfsUri location, boolean ifNotExists,
+      HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) {
+    super(fnSymbol, args, retTypeDef, location, ifNotExists, optArgs);
+    intermediateTypeDef_ = intermediateTypeDef;
+  }
+
+  private void reportCouldNotInferSymbol(String function) throws AnalysisException {
+    throw new AnalysisException("Could not infer symbol for "
+        + function + "() function.");
+  }
+
+  // Gets the symbol for 'arg'. If the user set it from the dll, return that. Otherwise
+  // try to infer the Symbol from the Update function. To infer the Symbol, the update
+  // function must contain "update" or "Update" and we switch that out with 'defaultSymbol'.
+  // Returns null if no symbol was found.
+  private String getSymbolSymbol(OptArg arg, String defaultSymbol) {
+    // First lookup if the user explicitly set it.
+    if (optArgs_.get(arg) != null) return optArgs_.get(arg);
+    // Try to match it from Update
+    String updateFn = optArgs_.get(OptArg.UPDATE_FN);
+    // Mangled strings start with _Z. We can't get substitute Symbols for mangled
+    // strings.
+    // TODO: this is doable in the BE with more symbol parsing.
+    if (updateFn.startsWith("_Z")) return null;
+
+    if (updateFn.contains("update")) return updateFn.replace("update", defaultSymbol);
+    if (updateFn.contains("Update")) {
+      char[] array = defaultSymbol.toCharArray();
+      array[0] = Character.toUpperCase(array[0]);
+      String s = new String(array);
+      return updateFn.replace("Update", s);
+    }
+    return null;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    Preconditions.checkNotNull(fn_);
+    Preconditions.checkState(fn_ instanceof AggregateFunction);
+    AggregateFunction uda = (AggregateFunction) fn_;
+
+    if (uda.getNumArgs() == 0) {
+      throw new AnalysisException("UDAs must take at least one argument.");
+    }
+
+    if (uda.getBinaryType() == TFunctionBinaryType.JAVA) {
+      throw new AnalysisException("Java UDAs are not supported.");
+    }
+
+    // TODO: these are temporarily restrictions since the BE cannot yet
+    // execute them.
+    if (uda.getBinaryType() == TFunctionBinaryType.IR) {
+      throw new AnalysisException("IR UDAs are not yet supported.");
+    }
+    if (fn_.hasVarArgs()) {
+      throw new AnalysisException("UDAs with varargs are not yet supported.");
+    }
+    if (fn_.getNumArgs() > 8) {
+      throw new AnalysisException(
+          "UDAs with more than 8 arguments are not yet supported.");
+    }
+
+    if (uda.getReturnType().getPrimitiveType() == PrimitiveType.CHAR) {
+      throw new AnalysisException("UDAs with CHAR return type are not yet supported.");
+    }
+    if (uda.getReturnType().getPrimitiveType() == PrimitiveType.VARCHAR) {
+      throw new AnalysisException("UDAs with VARCHAR return type are not yet supported.");
+    }
+    for (int i = 0; i < uda.getNumArgs(); ++i) {
+      if (uda.getArgs()[i].getPrimitiveType() == PrimitiveType.CHAR) {
+        throw new AnalysisException("UDAs with CHAR arguments are not yet supported.");
+      }
+      if (uda.getArgs()[i].getPrimitiveType() == PrimitiveType.VARCHAR) {
+        throw new AnalysisException("UDAs with VARCHAR arguments are not yet supported.");
+      }
+    }
+
+    Type intermediateType = null;
+    if (intermediateTypeDef_ == null) {
+      intermediateType = uda.getReturnType();
+    } else {
+      intermediateTypeDef_.analyze(analyzer);
+      intermediateType = intermediateTypeDef_.getType();
+    }
+    uda.setIntermediateType(intermediateType);
+
+    // Check arguments that are only valid in UDFs are not set.
+    checkOptArgNotSet(OptArg.SYMBOL);
+    checkOptArgNotSet(OptArg.PREPARE_FN);
+    checkOptArgNotSet(OptArg.CLOSE_FN);
+
+    // The user must provide the symbol for Update.
+    uda.setUpdateFnSymbol(uda.lookupSymbol(
+        checkAndGetOptArg(OptArg.UPDATE_FN), TSymbolType.UDF_EVALUATE, intermediateType,
+        uda.hasVarArgs(), uda.getArgs()));
+
+    // If the ddl did not specify the init/serialize/merge/finalize function
+    // Symbols, guess them based on the update fn Symbol.
+    Preconditions.checkNotNull(uda.getUpdateFnSymbol());
+    uda.setInitFnSymbol(getSymbolSymbol(OptArg.INIT_FN, "init"));
+    uda.setSerializeFnSymbol(getSymbolSymbol(OptArg.SERIALIZE_FN, "serialize"));
+    uda.setMergeFnSymbol(getSymbolSymbol(OptArg.MERGE_FN, "merge"));
+    uda.setFinalizeFnSymbol(getSymbolSymbol(OptArg.FINALIZE_FN, "finalize"));
+
+    // Init and merge are required.
+    if (uda.getInitFnSymbol() == null) reportCouldNotInferSymbol("init");
+    if (uda.getMergeFnSymbol() == null) reportCouldNotInferSymbol("merge");
+
+    // Validate that all set symbols exist.
+    uda.setInitFnSymbol(uda.lookupSymbol(uda.getInitFnSymbol(),
+        TSymbolType.UDF_EVALUATE, intermediateType, false));
+    uda.setMergeFnSymbol(uda.lookupSymbol(uda.getMergeFnSymbol(),
+        TSymbolType.UDF_EVALUATE, intermediateType, false, intermediateType));
+    if (uda.getSerializeFnSymbol() != null) {
+      try {
+        uda.setSerializeFnSymbol(uda.lookupSymbol(uda.getSerializeFnSymbol(),
+            TSymbolType.UDF_EVALUATE, null, false, intermediateType));
+      } catch (AnalysisException e) {
+        if (optArgs_.get(OptArg.SERIALIZE_FN) != null) {
+          throw e;
+        } else {
+          // Ignore, these symbols are optional.
+          uda.setSerializeFnSymbol(null);
+        }
+      }
+    }
+    if (uda.getFinalizeFnSymbol() != null) {
+      try {
+        uda.setFinalizeFnSymbol(uda.lookupSymbol(
+            uda.getFinalizeFnSymbol(), TSymbolType.UDF_EVALUATE, null, false,
+            intermediateType));
+      } catch (AnalysisException e) {
+        if (optArgs_.get(OptArg.FINALIZE_FN) != null) {
+          throw e;
+        } else {
+          // Ignore, these symbols are optional.
+          uda.setFinalizeFnSymbol(null);
+        }
+      }
+    }
+
+    // If the intermediate type is not the return type, then finalize is
+    // required.
+    if (!intermediateType.equals(fn_.getReturnType()) &&
+        uda.getFinalizeFnSymbol() == null) {
+      throw new AnalysisException("Finalize() is required for this UDA.");
+    }
+
+    sqlString_ = uda.toSql(ifNotExists_);
+  }
+
+  @Override
+  protected Function createFunction(FunctionName fnName, ArrayList<Type> argTypes,
+      Type retType, boolean hasVarArgs) {
+    return new AggregateFunction(fnName_, args_.getArgTypes(), retTypeDef_.getType(),
+        args_.hasVarArgs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
new file mode 100644
index 0000000..550d26f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.cloudera.impala.catalog.Db;
+import com.cloudera.impala.catalog.Function;
+import com.cloudera.impala.catalog.PrimitiveType;
+import com.cloudera.impala.catalog.ScalarFunction;
+import com.cloudera.impala.catalog.ScalarType;
+import com.cloudera.impala.catalog.Type;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.hive.executor.UdfExecutor.JavaUdfDataType;
+import com.cloudera.impala.thrift.TFunctionBinaryType;
+import com.cloudera.impala.thrift.TFunctionCategory;
+import com.cloudera.impala.thrift.TSymbolType;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE FUNCTION statement.
+ */
+public class CreateUdfStmt extends CreateFunctionStmtBase {
+  /**
+   * Builds a CREATE FUNCTION statement
+   * @param fnName - Name of the function
+   * @param fnArgs - List of types for the arguments to this function
+   * @param retType - The type this function returns.
+   * @param location - Path in HDFS containing the UDA.
+   * @param ifNotExists - If true, no errors are thrown if the function already exists
+   * @param additionalArgs - Key/Value pairs for additional arguments. The keys are
+   *        validated in analyze()
+   */
+  public CreateUdfStmt(FunctionName fnName, FunctionArgs args,
+      TypeDef retTypeDef, HdfsUri location, boolean ifNotExists,
+      HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) {
+    super(fnName, args, retTypeDef, location, ifNotExists, optArgs);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    Preconditions.checkNotNull(fn_);
+    Preconditions.checkNotNull(fn_ instanceof ScalarFunction);
+    ScalarFunction udf = (ScalarFunction) fn_;
+
+    if (hasSignature()) {
+      if (udf.getBinaryType() == TFunctionBinaryType.JAVA) {
+        if (!JavaUdfDataType.isSupported(udf.getReturnType())) {
+          throw new AnalysisException(
+              "Type " + udf.getReturnType().toSql() + " is not supported for Java UDFs.");
+        }
+        for (int i = 0; i < udf.getNumArgs(); ++i) {
+          if (!JavaUdfDataType.isSupported(udf.getArgs()[i])) {
+            throw new AnalysisException(
+                "Type " + udf.getArgs()[i].toSql() + " is not supported for Java UDFs.");
+          }
+        }
+      }
+
+      if (udf.getReturnType().getPrimitiveType() == PrimitiveType.CHAR) {
+        throw new AnalysisException("UDFs that use CHAR are not yet supported.");
+      }
+      if (udf.getReturnType().getPrimitiveType() == PrimitiveType.VARCHAR) {
+        throw new AnalysisException("UDFs that use VARCHAR are not yet supported.");
+      }
+      for (int i = 0; i < udf.getNumArgs(); ++i) {
+        if (udf.getArgs()[i].getPrimitiveType() == PrimitiveType.CHAR) {
+          throw new AnalysisException("UDFs that use CHAR are not yet supported.");
+        }
+        if (udf.getArgs()[i].getPrimitiveType() == PrimitiveType.VARCHAR) {
+          throw new AnalysisException("UDFs that use VARCHAR are not yet supported.");
+        }
+      }
+    }
+
+    // Check the user provided symbol exists
+    udf.setSymbolName(udf.lookupSymbol(
+        checkAndGetOptArg(OptArg.SYMBOL), TSymbolType.UDF_EVALUATE, null,
+        udf.hasVarArgs(), udf.getArgs()));
+
+    // Set optional Prepare/Close functions
+    String prepareFn = optArgs_.get(OptArg.PREPARE_FN);
+    if (prepareFn != null) {
+      udf.setPrepareFnSymbol(udf.lookupSymbol(prepareFn, TSymbolType.UDF_PREPARE));
+    }
+    String closeFn = optArgs_.get(OptArg.CLOSE_FN);
+    if (closeFn != null) {
+      udf.setCloseFnSymbol(udf.lookupSymbol(closeFn, TSymbolType.UDF_CLOSE));
+    }
+
+    // Udfs should not set any of these
+    checkOptArgNotSet(OptArg.UPDATE_FN);
+    checkOptArgNotSet(OptArg.INIT_FN);
+    checkOptArgNotSet(OptArg.SERIALIZE_FN);
+    checkOptArgNotSet(OptArg.MERGE_FN);
+    checkOptArgNotSet(OptArg.FINALIZE_FN);
+
+    sqlString_ = udf.toSql(ifNotExists_);
+
+    // Check that there is no function with the same name and isPersistent field not
+    // the same as udf.isPersistent_. For example we don't allow two JAVA udfs with
+    // same name and opposite persistence values set. This only applies for JAVA udfs
+    // as all the native udfs are persisted. Additionally we don't throw exceptions
+    // if "IF NOT EXISTS" is specified in the query.
+    if (udf.getBinaryType() != TFunctionBinaryType.JAVA || ifNotExists_) return;
+
+    Preconditions.checkNotNull(db_);
+    for (Function fn: db_.getFunctions(udf.functionName())) {
+      if (!hasSignature() || (hasSignature() && fn.isPersistent())) {
+        throw new AnalysisException(
+            String.format(Analyzer.FN_ALREADY_EXISTS_ERROR_MSG +
+                fn.signatureString()));
+      }
+    }
+  }
+
+  @Override
+  protected Function createFunction(FunctionName fnName, ArrayList<Type> argTypes, Type retType,
+      boolean hasVarArgs) {
+    return new ScalarFunction(fnName, argTypes, retType, hasVarArgs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
new file mode 100644
index 0000000..c38eef0
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.ArrayList;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.common.RuntimeEnv;
+import com.cloudera.impala.thrift.TAccessEvent;
+import com.cloudera.impala.thrift.TCatalogObjectType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a CREATE VIEW statement.
+ */
+public class CreateViewStmt extends CreateOrAlterViewStmtBase {
+
+  public CreateViewStmt(boolean ifNotExists, TableName tableName,
+      ArrayList<ColumnDef> columnDefs, String comment, QueryStmt viewDefStmt) {
+    super(ifNotExists, tableName, columnDefs, comment, viewDefStmt);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
+
+    tableName_.analyze();
+    // Use a child analyzer to let views have complex-typed columns.
+    Analyzer viewAnalyzerr = new Analyzer(analyzer);
+    // Enforce Hive column labels for view compatibility.
+    viewAnalyzerr.setUseHiveColLabels(true);
+    viewDefStmt_.analyze(viewAnalyzerr);
+
+    dbName_ = analyzer.getTargetDbName(tableName_);
+    owner_ = analyzer.getUser().getName();
+    if (analyzer.dbContainsTable(dbName_, tableName_.getTbl(), Privilege.CREATE) &&
+        !ifNotExists_) {
+      throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG +
+          String.format("%s.%s", dbName_, tableName_.getTbl()));
+    }
+    analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(),
+        TCatalogObjectType.VIEW, Privilege.CREATE.toString()));
+
+    createColumnAndViewDefs(analyzer);
+    if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
+      computeLineageGraph(analyzer);
+    }
+  }
+
+  @Override
+  public String toSql() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE VIEW ");
+    if (ifNotExists_) sb.append("IF NOT EXISTS ");
+    if (tableName_.getDb() != null) sb.append(tableName_.getDb() + ".");
+    sb.append(tableName_.getTbl() + " (");
+    sb.append(Joiner.on(", ").join(columnDefs_));
+    sb.append(") AS ");
+    sb.append(viewDefStmt_.toSql());
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
new file mode 100644
index 0000000..efa2117
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import java.util.List;
+
+import com.cloudera.impala.common.Pair;
+import com.cloudera.impala.planner.DataSink;
+import com.cloudera.impala.planner.KuduTableSink;
+import com.cloudera.impala.planner.TableSink;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.client.Delete;
+
+/**
+ * Representation of a DELETE statement.
+ *
+ * A delete statement contains three main parts, the target table reference, the from
+ * clause and the optional where clause. Syntactically, this is represented as follows:
+ *
+ *     DELETE [FROM] dotted_path [WHERE expr]
+ *     DELETE [table_alias] FROM table_ref_list [WHERE expr]
+ *
+ * Only the syntax using the explicit from clause can contain join conditions.
+ */
+public class DeleteStmt extends ModifyStmt {
+
+  public DeleteStmt(List<String> targetTablePath, FromClause tableRefs,
+      Expr wherePredicate, boolean ignoreNotFound) {
+    super(targetTablePath, tableRefs, Lists.<Pair<SlotRef, Expr>>newArrayList(),
+        wherePredicate, ignoreNotFound);
+  }
+
+  public DeleteStmt(DeleteStmt other) {
+    super(other.targetTablePath_, other.fromClause_.clone(),
+        Lists.<Pair<SlotRef, Expr>>newArrayList(), other.wherePredicate_.clone(),
+        other.ignoreNotFound_);
+  }
+
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(table_ != null);
+    TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
+        ImmutableList.<Expr>of(), referencedColumns_, false, ignoreNotFound_);
+    Preconditions.checkState(!referencedColumns_.isEmpty());
+    return tableSink;
+  }
+
+  @Override
+  public DeleteStmt clone() {
+    return new DeleteStmt(this);
+  }
+
+  @Override
+  public String toSql() {
+    StringBuilder b = new StringBuilder();
+    b.append("DELETE");
+    if (ignoreNotFound_) b.append(" IGNORE");
+    if (fromClause_.size() > 1 || targetTableRef_.hasExplicitAlias()) {
+      b.append(" ");
+      if (targetTableRef_.hasExplicitAlias()) {
+        b.append(targetTableRef_.getExplicitAlias());
+      } else {
+        b.append(targetTableRef_.toSql());
+      }
+    }
+    b.append(fromClause_.toSql());
+    if (wherePredicate_ != null) {
+      b.append(" WHERE ");
+      b.append(wherePredicate_.toSql());
+    }
+    return b.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
new file mode 100644
index 0000000..0ddd6ec
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloudera.impala.analysis;
+
+import com.cloudera.impala.authorization.Privilege;
+import com.cloudera.impala.common.AnalysisException;
+import com.cloudera.impala.thrift.TDescribeDbParams;
+import com.cloudera.impala.thrift.TDescribeOutputStyle;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * Represents a DESCRIBE DATABASE statement which returns metadata on
+ * a specified database:
+ * Syntax: DESCRIBE DATABASE [FORMATTED|EXTENDED] <db>
+ *
+ * If FORMATTED|EXTENDED is not specified, the statement only returns the given
+ * database's location and comment.
+ * If FORMATTED|EXTENDED is specified, extended metadata on the database is returned.
+ * This metadata includes info about the database's parameters, owner info
+ * and privileges.
+ */
+public class DescribeDbStmt extends StatementBase {
+  private final TDescribeOutputStyle outputStyle_;
+  private final String dbName_;
+
+  public DescribeDbStmt(String dbName, TDescribeOutputStyle outputStyle) {
+    Preconditions.checkState(!Strings.isNullOrEmpty(dbName), "Invalid database name");
+    dbName_ = dbName;
+    outputStyle_ = outputStyle;
+  }
+
+  @Override
+  public String toSql() {
+    StringBuilder sb = new StringBuilder("DESCRIBE DATABASE ");
+    if (outputStyle_ != TDescribeOutputStyle.MINIMAL) {
+      sb.append(outputStyle_.toString() + " ");
+    }
+    return sb.toString() + dbName_;
+  }
+
+  public String getDb() { return dbName_; }
+  public TDescribeOutputStyle getOutputStyle() { return outputStyle_; }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    analyzer.getDb(dbName_, Privilege.VIEW_METADATA);
+  }
+
+  public TDescribeDbParams toThrift() {
+    TDescribeDbParams params = new TDescribeDbParams();
+    params.setDb(dbName_);
+    params.setOutput_style(outputStyle_);
+    return params;
+  }
+}