You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "paul-rogers (via GitHub)" <gi...@apache.org> on 2023/02/28 07:10:41 UTC

[GitHub] [druid] paul-rogers commented on a diff in pull request #13686: Integrate the catalog with the Calcite planner

paul-rogers commented on code in PR #13686:
URL: https://github.com/apache/druid/pull/13686#discussion_r1119632427


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }

Review Comment:
   The source can also be a `WITH` clause. We didn't change the source when we searched for an `ORDER BY` clause.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know

Review Comment:
   - [ ] The word "odd" here means "unusual" - "different than the SQL semantics which the Calcite validator was designed to implement, and since they are different, we have to fight Calcite to to the opposite of what the thick, well-thought-out, time-tested, industry-standard SQL standard says we should do." In my mind, that is "unusual." Changed it to "non-standard".



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );

Review Comment:
   `INSERT` or `REPLACE`. This is more-or-less code that was moved from another place. Happy to clean it up, but I was going for minimal change for the stuff that got moved.
   
   The idea is that datasources (the Druid database files, not "data sources", the thing from which one can read data) reside in the `druid` schema. If the schema is `sys`, say, we know that the target table cannot be a datasource because datasources can live only in `druid`.
   
   Reworded the error message.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );

Review Comment:
   This is for ingestion. My understanding of changing rollup is that it occurs with existing data, through compaction. Would we instead use MSQ to write the datasource into itself? If so, then there would be more analysis needed to say, "you must use the declared grain when ingesting from an external source, but anything goes when ingesting from the same datasource as the target."
   
   My assumption is that if you don't want to have a fixed granularity, then don't specify it in the catalog: let it be specified on each ingestion. This is an "expert" mode for the wizards who know when to switch ingestions on various ingestions. For the mortal user, if they say they want X, we enforce X.
   
   The thought is that if we want to support "X, but sometimes Y, an Z after 30 days, then W after 90 days", don't specify a granularity now. Later, we can implement aging rules where someone could express such semantics in a way that compaction could do the job automatically. Clearly, we have to walk before we run, and this check is that initial walk step.
   
   There are manages services use cases where users _do_ want to lock down granularity, so think of this preliminary approach as handing just that one use case: everyone else decides per-ingest what to use.
   
   @imply-cheddar, to understand what those future rules might be, can you explain under what scenarios we change the time grain. For different initial ingestions? For data aging via compaction? Something else?



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);

Review Comment:
   In this case, that's fine. We don't care _why_ the item is not a datasource, only that it isn't. The error message is pretty clear: the thing you want to INSERT/REPLACE into isn't a datasource. We don't really know what it is, only that it's not what we support.
   
   We could be pedantic and log the error. But, what is the sys admin going to do about it? We could include the error for the user, who will find it useless. We could do our own checks rather than the Calcite ones if we're not permitted to catch the exception without one of the above two options. Or, we could leave it to the judgement of us poor code-stained wretches and assume that we know what we're doing.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }

Review Comment:
   Good points. The existing code mucks with the context directly, and I preserved that here. The alternative would be to modify a Rel node based on this info, ensure that info is preserved though the various rewrite rules, and then translated via the native query rules into... a context setting.
   
   Since this PR already introduced many complex changes, I'll leave this idea as a refinement later when it can be considered in isolation.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }

Review Comment:
   Sorry you find the code hard to read! As it turns out, the alternative doesn't handle all the cases. The revised logic is the following. Perhaps it is easier to read.
   
   ```java
       if (definedGranularity == null) { 
         if (ingestionGranularity == null) { 
           // Neither have a value: error
           throw new IAE(...);
         } else { 
           finalGranularity = ingestionGranularity; 
         } 
       } else { 
         if (ingestionGranularity == null) {
           // The query has no granularity: just apply the catalog granularity.
           finalGranularity = definedGranularity; 
         } else if (definedGranularity.equals(ingestionGranularity)) { 
           // Both have a setting. They have to be the same.
           finalGranularity = definedGranularity; 
         } else { 
           throw new IAE(...);
         } 
       }
   ```



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }

Review Comment:
   This code path will only ever see the `ALL` granularity or `PeriodGranularity`. Added a comment and removed the "catch all" `toString()`.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }
+    return ((PeriodGranularity) gran).getPeriod().toString();
+  }
+
+  private void validateInsertSelect(
+      SqlNode source,
+      final String operationName
+  )
+  {
+    // The source SELECT cannot include an ORDER BY clause. Ordering is given
+    // by the CLUSTERED BY clause, if any.
+    // Check that an ORDER BY clause is not provided by the underlying query
+    SqlNodeList orderByList;
+    if (source instanceof SqlOrderBy) {
+      throw new IAE(
+          "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+          statementArticle(operationName),
+          operationName
+      );
+    }

Review Comment:
   This is preserved from the existing code, just moved here. Perhaps we should have used `ORDER BY` as an alias for `CLUSTERED BY`. This is something we can refine later. 



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }
+    return ((PeriodGranularity) gran).getPeriod().toString();
+  }
+
+  private void validateInsertSelect(
+      SqlNode source,
+      final String operationName
+  )
+  {
+    // The source SELECT cannot include an ORDER BY clause. Ordering is given
+    // by the CLUSTERED BY clause, if any.
+    // Check that an ORDER BY clause is not provided by the underlying query
+    SqlNodeList orderByList;
+    if (source instanceof SqlOrderBy) {
+      throw new IAE(
+          "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+          statementArticle(operationName),
+          operationName
+      );
+    }
+
+    if (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }

Review Comment:
   Renamed the method. What we're doing here is looking for the various ways that an ORDER BY could sneak into the query: via `INSERT ... ORDER BY`, `INSERT SELECT ... ORDER BY` or `INSERT WITH ... (SELECT ... ORDER BY)...`.
   
   Calcite will do the actual validation a bit later. Here we're just inspecting the parse node for a disallowed pattern.
   



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }
+    return ((PeriodGranularity) gran).getPeriod().toString();
+  }
+
+  private void validateInsertSelect(
+      SqlNode source,
+      final String operationName
+  )
+  {
+    // The source SELECT cannot include an ORDER BY clause. Ordering is given
+    // by the CLUSTERED BY clause, if any.
+    // Check that an ORDER BY clause is not provided by the underlying query
+    SqlNodeList orderByList;
+    if (source instanceof SqlOrderBy) {
+      throw new IAE(
+          "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+          statementArticle(operationName),
+          operationName
+      );
+    }
+
+    if (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    if (source instanceof SqlSelect) {
+      SqlSelect select = (SqlSelect) source;
+      orderByList = select.getOrderList();
+      if (orderByList != null && orderByList.size() != 0) {
+        throw new IAE(
+            "ORDER BY is not supported within %s %s statement, use CLUSTERED BY instead.",
+            statementArticle(operationName),
+            operationName
+        );
+      }
+    } else {
+      throw new IAE(
+          "%s is not supported within %s %s statement.",
+          source.getKind(),
+          statementArticle(operationName),
+          operationName
+      );
+    }
+  }
+
+  private String statementArticle(String operationName)
+  {
+    return "INSERT".equals(operationName) ? "an" : "a";
+  }
+
+  private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata)
+  {
+    if (tableMetadata == null) {
+      return null;
+    }
+    List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
+    if (CollectionUtils.isNullOrEmpty(keyCols)) {
+      return null;
+    }
+    SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
+    for (ClusterKeySpec keyCol : keyCols) {
+      SqlIdentifier colIdent = new SqlIdentifier(
+          Collections.singletonList(keyCol.expr()),
+          null, SqlParserPos.ZERO,
+          Collections.singletonList(SqlParserPos.ZERO)
+          );
+      SqlNode keyNode;

Review Comment:
   The compiler will complain if a local variable is used before it is set. But, Druid uses the `final` convention, so added that everywhere it made sense.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }
+    return ((PeriodGranularity) gran).getPeriod().toString();
+  }
+
+  private void validateInsertSelect(
+      SqlNode source,
+      final String operationName
+  )
+  {
+    // The source SELECT cannot include an ORDER BY clause. Ordering is given
+    // by the CLUSTERED BY clause, if any.
+    // Check that an ORDER BY clause is not provided by the underlying query
+    SqlNodeList orderByList;
+    if (source instanceof SqlOrderBy) {
+      throw new IAE(
+          "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+          statementArticle(operationName),
+          operationName
+      );
+    }
+
+    if (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    if (source instanceof SqlSelect) {
+      SqlSelect select = (SqlSelect) source;
+      orderByList = select.getOrderList();
+      if (orderByList != null && orderByList.size() != 0) {
+        throw new IAE(
+            "ORDER BY is not supported within %s %s statement, use CLUSTERED BY instead.",
+            statementArticle(operationName),
+            operationName
+        );
+      }
+    } else {
+      throw new IAE(
+          "%s is not supported within %s %s statement.",
+          source.getKind(),
+          statementArticle(operationName),
+          operationName
+      );
+    }
+  }
+
+  private String statementArticle(String operationName)
+  {
+    return "INSERT".equals(operationName) ? "an" : "a";
+  }
+
+  private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata)
+  {
+    if (tableMetadata == null) {
+      return null;
+    }
+    List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
+    if (CollectionUtils.isNullOrEmpty(keyCols)) {
+      return null;
+    }
+    SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
+    for (ClusterKeySpec keyCol : keyCols) {
+      SqlIdentifier colIdent = new SqlIdentifier(
+          Collections.singletonList(keyCol.expr()),
+          null, SqlParserPos.ZERO,
+          Collections.singletonList(SqlParserPos.ZERO)
+          );
+      SqlNode keyNode;
+      if (keyCol.desc()) {
+        keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, colIdent);
+      } else {
+        keyNode = colIdent;
+      }
+      keyNodes.add(keyNode);
+    }
+    return keyNodes;
+  }
+
+  /**
+   * Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as
+   * an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation
+   * applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back
+   * out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not
+   * actually specify (it is an error to do so.) However, with the current hybrid structure, it is
+   * not possible to add the ORDER by later: doing so requires access to the order by namespace
+   * which is not visible to subclasses.
+   */
+  private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, SqlNodeList catalogClustering)
+  {
+    SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+    if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
+      if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
+        return;
+      }
+      clusteredBy = catalogClustering;
+    }
+    while (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    SqlSelect select = (SqlSelect) source;
+
+    // This part is a bit sad. By the time we get here, the validator will have created
+    // the ORDER BY namespace if we had a real ORDER BY. We have to "catch up" and do the
+    // work that registerQuery() should have done. That's kind of OK. But, the orderScopes
+    // variable is private, so we have to play dirty tricks to get at it.
+    select.setOrderBy(clusteredBy);
+    OrderByScope orderScope = ValidatorShim.newOrderByScope(scopes.get(select), clusteredBy, select);
+    try {
+      @SuppressWarnings("unchecked")
+      Map<SqlSelect, SqlValidatorScope> orderScopes = (Map<SqlSelect, SqlValidatorScope>) FieldUtils.getDeclaredField(SqlValidatorImpl.class, "orderScopes", true).get(this);
+      orderScopes.put(select, orderScope);
+    }
+    catch (Exception e) {
+      throw new ISE(e, "orderScopes is not accessible");
+    }
+  }
+
+  private void validateTimeColumn(RelRecordType sourceType, int timeColumnIndex)
+  {
+    RelDataTypeField timeCol = sourceType.getFieldList().get(timeColumnIndex);
+    RelDataType timeColType = timeCol.getType();
+    if (timeColType instanceof BasicSqlType) {
+      BasicSqlType timeColSqlType = (BasicSqlType) timeColType;
+      SqlTypeName timeColSqlTypeName = timeColSqlType.getSqlTypeName();
+      if (timeColSqlTypeName == SqlTypeName.BIGINT || timeColSqlTypeName == SqlTypeName.TIMESTAMP) {
+        return;
+      }
+    }
+    throw new IAE(
+        "Invalid %s column type %s: must be BIGINT or TIMESTAMP",
+        timeCol.getName(),
+        timeColType.toString()
+    );
+  }
+
+  /**
+   * Verify clustering which can come from the query, the catalog or both. If both,
+   * the two must match. In either case, the cluster keys must be present in the SELECT
+   * clause. The {@code __time} column cannot be included.
+   */
+  private void validateClustering(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final DruidSqlIngest ingestNode,
+      final SqlNodeList catalogClustering
+  )
+  {
+    final SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+
+    // Validate both the catalog and query definitions if present. This ensures
+    // that things are sane if we later check that the two are identical.
+    if (clusteredBy != null) {
+      validateClusteredBy(sourceType, timeColumnIndex, clusteredBy);
+    }
+    if (catalogClustering != null) {
+      // Catalog defines the key columns. Verify that they are present in the query.
+      validateClusteredBy(sourceType, timeColumnIndex, catalogClustering);
+    }
+    if (clusteredBy != null && catalogClustering != null) {
+      // Both the query and catalog have keys.
+      verifyQueryClusterByMatchesCatalog(sourceType, catalogClustering, clusteredBy);
+    }
+  }
+
+  /**
+   * Validate the CLUSTERED BY list. Members can be any of the following:
+   * <p>
+   * {@code CLUSTERED BY [<ordinal> | <id> | <expr>] DESC?}
+   * <p>
+   * Ensure that each id exists. Ensure each column is included only once.
+   * For an expression, just ensure it is valid; we don't check for duplicates.
+   * <p>
+   * Once the keys are validated, update the underlying {@code SELECT} statement
+   * to include the {@code CLUSTERED BY} as the {@code ORDER BY} clause.
+   */
+  private void validateClusteredBy(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final SqlNodeList clusteredBy
+  )
+  {
+    // Keep track of fields which have been referenced.
+    final List<String> fieldNames = sourceType.getFieldNames();
+    final int fieldCount = fieldNames.size();
+    final boolean[] refs = new boolean[fieldCount];
+
+    // Process cluster keys
+    for (SqlNode clusterKey : clusteredBy) {
+      Pair<Integer, Boolean> key = resolveClusterKey(clusterKey, fieldNames);
+      // If an expression, index is null. Validation was done in the ORDER BY check.
+      // Else, do additional MSQ-specific checks.
+      if (key != null) {
+        int index = key.left;
+        // Can't cluster by __time
+        if (index == timeColumnIndex) {
+          throw new IAE("Do not include %s in the CLUSTERED BY clause: it is managed by PARTITIONED BY", Columns.TIME_COLUMN);
+        }
+        // No duplicate references
+        if (refs[index]) {
+          throw new IAE("Duplicate CLUSTERED BY key: %s", clusterKey);
+        }
+        refs[index] = true;
+      }
+    }
+  }
+
+  private Pair<Integer, Boolean> resolveClusterKey(SqlNode clusterKey, final List<String> fieldNames)
+  {
+    boolean desc = false;
+
+    // Check if the key is compound: only occurs for DESC. The ASC
+    // case is abstracted away by the parser.
+    if (clusterKey instanceof SqlBasicCall) {
+      SqlBasicCall basicCall = (SqlBasicCall) clusterKey;
+      if (basicCall.getOperator() == SqlStdOperatorTable.DESC) {
+        // Cluster key is compound: CLUSTERED BY foo DESC
+        // We check only the first element
+        clusterKey = ((SqlBasicCall) clusterKey).getOperandList().get(0);
+        desc = true;
+      }
+    }
+
+    // We now have the actual key. Handle the three cases.
+    if (clusterKey instanceof SqlNumericLiteral) {
+      // Key is an ordinal: CLUSTERED BY 2
+      // Ordinals are 1-based.
+      int ord = ((SqlNumericLiteral) clusterKey).intValue(true);
+      int index = ord - 1;
+
+      // The ordinal has to be in range.
+      if (index < 0 || fieldNames.size() <= index) {
+        throw new IAE("CLUSTERED BY ordinal %d is not valid", ord);
+      }
+      return new Pair<>(index, desc);
+    } else if (clusterKey instanceof SqlIdentifier) {
+      // Key is an identifier: CLUSTERED BY foo
+      SqlIdentifier key = (SqlIdentifier) clusterKey;
+
+      // Only key of the form foo are allowed, not foo.bar
+      if (!key.isSimple()) {
+        throw new IAE("CLUSTERED BY keys must be a simple name: '%s'", key.toString());
+      }
+
+      // The name must match an item in the select list
+      String keyName = key.names.get(0);
+      // Slow linear search. We assume that there are not many cluster keys.
+      int index = fieldNames.indexOf(keyName);
+      if (index == -1) {
+        throw new IAE("CLUSTERED BY key column '%s' is not valid", keyName);
+      }
+      return new Pair<>(index, desc);
+    } else {
+      // Key is an expression: CLUSTERED BY CEIL(m2)
+      return null;
+    }
+  }
+
+  /**
+   * Both the catalog and query define clustering. This is allowed as long as they
+   * are identical.
+   */
+  private void verifyQueryClusterByMatchesCatalog(
+      final RelRecordType sourceType,
+      final SqlNodeList catalogClustering,
+      final SqlNodeList clusteredBy
+  )
+  {
+    if (clusteredBy.size() != catalogClustering.size()) {
+      throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+    }
+    List<String> fieldNames = sourceType.getFieldNames();
+    for (int i = 0; i < clusteredBy.size(); i++) {
+      SqlNode catalogKey = catalogClustering.get(i);
+      SqlNode clusterKey = clusteredBy.get(i);
+      Pair<Integer, Boolean> catalogPair = resolveClusterKey(catalogKey, fieldNames);
+      Pair<Integer, Boolean> queryPair = resolveClusterKey(clusterKey, fieldNames);
+
+      // Cluster keys in the catalog must be field references. If unresolved,
+      // we would have gotten an error above. Here we make sure that both
+      // indexes are the same. Since the catalog index can't be null, we're
+      // essentially checking that the indexes are the same: they name the same
+      // column.
+      if (!Objects.equals(catalogPair, queryPair)) {
+        throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+      }
+    }
+  }
+
+  private RuntimeException clusterKeyMismatchException(SqlNodeList catalogClustering, SqlNodeList clusterKeys)
+  {
+    throw new IAE(
+        "CLUSTER BY mismatch. Catalog: [%s], query: [%s]",
+        catalogClustering,
+        clusterKeys
+    );
+  }
+
+  /**
+   * Compute and validate the target type. In normal SQL, the engine would insert
+   * a project operator after the SELECT before the write to cast columns from the
+   * input type to the (compatible) defined output type. Druid doesn't work that way.
+   * In MSQ, the output the just is the input type. If the user wants to control the
+   * output type, then the user must manually insert any required CAST: Druid is not
+   * in the business of changing the type to suit the catalog.
+   * <p>
+   * As a result, we first propagate column names and types using Druid rules: the
+   * output is exactly what SELECT says it is. We then apply restrictions from the
+   * catalog. If the table is strict, only column names from the catalog can be
+   * used.
+   */
+  private RelDataType validateTargetType(SqlInsert insert, RelRecordType sourceType, DatasourceFacade tableMetadata)
+  {
+    final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
+    for (RelDataTypeField sourceField : sourceFields) {
+      String colName = sourceField.getName();
+      // Check that there are no unnamed columns in the insert.
+      if (UNNAMED_COLUMN_PATTERN.matcher(colName).matches()) {
+        throw new IAE("Expressions must provide an alias to specify the target column: func(X) AS myColumn");
+      }
+    }
+    if (tableMetadata == null) {
+      return sourceType;
+    }
+    final boolean isStrict = tableMetadata.isSealed();
+    final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
+    for (RelDataTypeField sourceField : sourceFields) {
+      String colName = sourceField.getName();
+      ColumnFacade definedCol = tableMetadata.column(colName);
+      if (definedCol == null) {
+        // No catalog definition for this column.
+        if (isStrict) {
+          // Table is strict: cannot add new columns at ingest time.
+          throw new IAE(
+              "Target column \"%s\".\"%s\" is not defined",
+              insert.getTargetTable(),
+              colName
+          );
+        }
+
+        // Table is not strict: add a new column based on the SELECT column.
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // If the column name is defined, but no type is given then, use the
+      // column type from SELECT.
+      if (!definedCol.hasType()) {
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // Both the column name and type are provided. Use the name and type
+      // from the catalog.
+      // TODO: Handle error if type not found

Review Comment:
   You are asking that code be perfect, or that we leave readers in the dark when things are unsure. Changing the word "TODO" into "A word of caution for future readers" is just playing word games. However you seem to feel strongly about this so I rewrote the comment.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }
+    return ((PeriodGranularity) gran).getPeriod().toString();
+  }
+
+  private void validateInsertSelect(
+      SqlNode source,
+      final String operationName
+  )
+  {
+    // The source SELECT cannot include an ORDER BY clause. Ordering is given
+    // by the CLUSTERED BY clause, if any.
+    // Check that an ORDER BY clause is not provided by the underlying query
+    SqlNodeList orderByList;
+    if (source instanceof SqlOrderBy) {
+      throw new IAE(
+          "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+          statementArticle(operationName),
+          operationName
+      );
+    }
+
+    if (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    if (source instanceof SqlSelect) {
+      SqlSelect select = (SqlSelect) source;
+      orderByList = select.getOrderList();
+      if (orderByList != null && orderByList.size() != 0) {
+        throw new IAE(
+            "ORDER BY is not supported within %s %s statement, use CLUSTERED BY instead.",
+            statementArticle(operationName),
+            operationName
+        );
+      }
+    } else {
+      throw new IAE(
+          "%s is not supported within %s %s statement.",
+          source.getKind(),
+          statementArticle(operationName),
+          operationName
+      );
+    }
+  }
+
+  private String statementArticle(String operationName)
+  {
+    return "INSERT".equals(operationName) ? "an" : "a";
+  }
+
+  private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata)
+  {
+    if (tableMetadata == null) {
+      return null;
+    }
+    List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
+    if (CollectionUtils.isNullOrEmpty(keyCols)) {
+      return null;
+    }
+    SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
+    for (ClusterKeySpec keyCol : keyCols) {
+      SqlIdentifier colIdent = new SqlIdentifier(
+          Collections.singletonList(keyCol.expr()),
+          null, SqlParserPos.ZERO,
+          Collections.singletonList(SqlParserPos.ZERO)
+          );
+      SqlNode keyNode;
+      if (keyCol.desc()) {
+        keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, colIdent);
+      } else {
+        keyNode = colIdent;
+      }
+      keyNodes.add(keyNode);
+    }
+    return keyNodes;
+  }
+
+  /**
+   * Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as
+   * an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation
+   * applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back
+   * out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not
+   * actually specify (it is an error to do so.) However, with the current hybrid structure, it is
+   * not possible to add the ORDER by later: doing so requires access to the order by namespace
+   * which is not visible to subclasses.
+   */
+  private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, SqlNodeList catalogClustering)
+  {
+    SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+    if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
+      if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
+        return;
+      }
+      clusteredBy = catalogClustering;
+    }
+    while (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    SqlSelect select = (SqlSelect) source;
+
+    // This part is a bit sad. By the time we get here, the validator will have created
+    // the ORDER BY namespace if we had a real ORDER BY. We have to "catch up" and do the
+    // work that registerQuery() should have done. That's kind of OK. But, the orderScopes
+    // variable is private, so we have to play dirty tricks to get at it.
+    select.setOrderBy(clusteredBy);
+    OrderByScope orderScope = ValidatorShim.newOrderByScope(scopes.get(select), clusteredBy, select);
+    try {
+      @SuppressWarnings("unchecked")
+      Map<SqlSelect, SqlValidatorScope> orderScopes = (Map<SqlSelect, SqlValidatorScope>) FieldUtils.getDeclaredField(SqlValidatorImpl.class, "orderScopes", true).get(this);
+      orderScopes.put(select, orderScope);
+    }
+    catch (Exception e) {
+      throw new ISE(e, "orderScopes is not accessible");
+    }
+  }
+
+  private void validateTimeColumn(RelRecordType sourceType, int timeColumnIndex)
+  {
+    RelDataTypeField timeCol = sourceType.getFieldList().get(timeColumnIndex);
+    RelDataType timeColType = timeCol.getType();
+    if (timeColType instanceof BasicSqlType) {
+      BasicSqlType timeColSqlType = (BasicSqlType) timeColType;
+      SqlTypeName timeColSqlTypeName = timeColSqlType.getSqlTypeName();
+      if (timeColSqlTypeName == SqlTypeName.BIGINT || timeColSqlTypeName == SqlTypeName.TIMESTAMP) {
+        return;
+      }
+    }
+    throw new IAE(
+        "Invalid %s column type %s: must be BIGINT or TIMESTAMP",
+        timeCol.getName(),
+        timeColType.toString()
+    );
+  }
+
+  /**
+   * Verify clustering which can come from the query, the catalog or both. If both,
+   * the two must match. In either case, the cluster keys must be present in the SELECT
+   * clause. The {@code __time} column cannot be included.
+   */
+  private void validateClustering(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final DruidSqlIngest ingestNode,
+      final SqlNodeList catalogClustering
+  )
+  {
+    final SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+
+    // Validate both the catalog and query definitions if present. This ensures
+    // that things are sane if we later check that the two are identical.
+    if (clusteredBy != null) {
+      validateClusteredBy(sourceType, timeColumnIndex, clusteredBy);
+    }
+    if (catalogClustering != null) {
+      // Catalog defines the key columns. Verify that they are present in the query.
+      validateClusteredBy(sourceType, timeColumnIndex, catalogClustering);
+    }
+    if (clusteredBy != null && catalogClustering != null) {
+      // Both the query and catalog have keys.
+      verifyQueryClusterByMatchesCatalog(sourceType, catalogClustering, clusteredBy);
+    }
+  }
+
+  /**
+   * Validate the CLUSTERED BY list. Members can be any of the following:
+   * <p>
+   * {@code CLUSTERED BY [<ordinal> | <id> | <expr>] DESC?}
+   * <p>
+   * Ensure that each id exists. Ensure each column is included only once.
+   * For an expression, just ensure it is valid; we don't check for duplicates.
+   * <p>
+   * Once the keys are validated, update the underlying {@code SELECT} statement
+   * to include the {@code CLUSTERED BY} as the {@code ORDER BY} clause.
+   */
+  private void validateClusteredBy(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final SqlNodeList clusteredBy
+  )
+  {
+    // Keep track of fields which have been referenced.
+    final List<String> fieldNames = sourceType.getFieldNames();
+    final int fieldCount = fieldNames.size();
+    final boolean[] refs = new boolean[fieldCount];
+
+    // Process cluster keys
+    for (SqlNode clusterKey : clusteredBy) {
+      Pair<Integer, Boolean> key = resolveClusterKey(clusterKey, fieldNames);
+      // If an expression, index is null. Validation was done in the ORDER BY check.
+      // Else, do additional MSQ-specific checks.
+      if (key != null) {
+        int index = key.left;
+        // Can't cluster by __time
+        if (index == timeColumnIndex) {
+          throw new IAE("Do not include %s in the CLUSTERED BY clause: it is managed by PARTITIONED BY", Columns.TIME_COLUMN);
+        }
+        // No duplicate references
+        if (refs[index]) {
+          throw new IAE("Duplicate CLUSTERED BY key: %s", clusterKey);
+        }
+        refs[index] = true;
+      }
+    }
+  }
+
+  private Pair<Integer, Boolean> resolveClusterKey(SqlNode clusterKey, final List<String> fieldNames)
+  {
+    boolean desc = false;
+
+    // Check if the key is compound: only occurs for DESC. The ASC
+    // case is abstracted away by the parser.
+    if (clusterKey instanceof SqlBasicCall) {
+      SqlBasicCall basicCall = (SqlBasicCall) clusterKey;
+      if (basicCall.getOperator() == SqlStdOperatorTable.DESC) {
+        // Cluster key is compound: CLUSTERED BY foo DESC
+        // We check only the first element
+        clusterKey = ((SqlBasicCall) clusterKey).getOperandList().get(0);
+        desc = true;
+      }
+    }
+
+    // We now have the actual key. Handle the three cases.
+    if (clusterKey instanceof SqlNumericLiteral) {
+      // Key is an ordinal: CLUSTERED BY 2
+      // Ordinals are 1-based.
+      int ord = ((SqlNumericLiteral) clusterKey).intValue(true);
+      int index = ord - 1;
+
+      // The ordinal has to be in range.
+      if (index < 0 || fieldNames.size() <= index) {
+        throw new IAE("CLUSTERED BY ordinal %d is not valid", ord);
+      }
+      return new Pair<>(index, desc);
+    } else if (clusterKey instanceof SqlIdentifier) {
+      // Key is an identifier: CLUSTERED BY foo
+      SqlIdentifier key = (SqlIdentifier) clusterKey;
+
+      // Only key of the form foo are allowed, not foo.bar
+      if (!key.isSimple()) {
+        throw new IAE("CLUSTERED BY keys must be a simple name: '%s'", key.toString());
+      }
+
+      // The name must match an item in the select list
+      String keyName = key.names.get(0);
+      // Slow linear search. We assume that there are not many cluster keys.
+      int index = fieldNames.indexOf(keyName);
+      if (index == -1) {
+        throw new IAE("CLUSTERED BY key column '%s' is not valid", keyName);
+      }
+      return new Pair<>(index, desc);
+    } else {
+      // Key is an expression: CLUSTERED BY CEIL(m2)
+      return null;
+    }
+  }
+
+  /**
+   * Both the catalog and query define clustering. This is allowed as long as they
+   * are identical.
+   */
+  private void verifyQueryClusterByMatchesCatalog(
+      final RelRecordType sourceType,
+      final SqlNodeList catalogClustering,
+      final SqlNodeList clusteredBy
+  )
+  {
+    if (clusteredBy.size() != catalogClustering.size()) {
+      throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+    }
+    List<String> fieldNames = sourceType.getFieldNames();
+    for (int i = 0; i < clusteredBy.size(); i++) {
+      SqlNode catalogKey = catalogClustering.get(i);
+      SqlNode clusterKey = clusteredBy.get(i);
+      Pair<Integer, Boolean> catalogPair = resolveClusterKey(catalogKey, fieldNames);
+      Pair<Integer, Boolean> queryPair = resolveClusterKey(clusterKey, fieldNames);
+
+      // Cluster keys in the catalog must be field references. If unresolved,
+      // we would have gotten an error above. Here we make sure that both
+      // indexes are the same. Since the catalog index can't be null, we're
+      // essentially checking that the indexes are the same: they name the same
+      // column.
+      if (!Objects.equals(catalogPair, queryPair)) {
+        throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+      }
+    }
+  }
+
+  private RuntimeException clusterKeyMismatchException(SqlNodeList catalogClustering, SqlNodeList clusterKeys)
+  {
+    throw new IAE(
+        "CLUSTER BY mismatch. Catalog: [%s], query: [%s]",
+        catalogClustering,
+        clusterKeys
+    );
+  }
+
+  /**
+   * Compute and validate the target type. In normal SQL, the engine would insert
+   * a project operator after the SELECT before the write to cast columns from the
+   * input type to the (compatible) defined output type. Druid doesn't work that way.
+   * In MSQ, the output the just is the input type. If the user wants to control the
+   * output type, then the user must manually insert any required CAST: Druid is not
+   * in the business of changing the type to suit the catalog.
+   * <p>
+   * As a result, we first propagate column names and types using Druid rules: the
+   * output is exactly what SELECT says it is. We then apply restrictions from the
+   * catalog. If the table is strict, only column names from the catalog can be
+   * used.
+   */
+  private RelDataType validateTargetType(SqlInsert insert, RelRecordType sourceType, DatasourceFacade tableMetadata)
+  {
+    final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
+    for (RelDataTypeField sourceField : sourceFields) {
+      String colName = sourceField.getName();
+      // Check that there are no unnamed columns in the insert.
+      if (UNNAMED_COLUMN_PATTERN.matcher(colName).matches()) {
+        throw new IAE("Expressions must provide an alias to specify the target column: func(X) AS myColumn");
+      }

Review Comment:
   This was carried over from previous code. Calcite has pulled out the field names for us and we found one of the form [`EXPR$n`]. We don't have the expression, and the user has no idea what [`EXPR$n`] means. I believe the list of expressions is in order, so I added `Expression [%d] must provide....`



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }
+    return ((PeriodGranularity) gran).getPeriod().toString();
+  }
+
+  private void validateInsertSelect(
+      SqlNode source,
+      final String operationName
+  )
+  {
+    // The source SELECT cannot include an ORDER BY clause. Ordering is given
+    // by the CLUSTERED BY clause, if any.
+    // Check that an ORDER BY clause is not provided by the underlying query
+    SqlNodeList orderByList;
+    if (source instanceof SqlOrderBy) {
+      throw new IAE(
+          "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+          statementArticle(operationName),
+          operationName
+      );
+    }
+
+    if (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    if (source instanceof SqlSelect) {
+      SqlSelect select = (SqlSelect) source;
+      orderByList = select.getOrderList();
+      if (orderByList != null && orderByList.size() != 0) {
+        throw new IAE(
+            "ORDER BY is not supported within %s %s statement, use CLUSTERED BY instead.",
+            statementArticle(operationName),
+            operationName
+        );
+      }
+    } else {
+      throw new IAE(
+          "%s is not supported within %s %s statement.",
+          source.getKind(),
+          statementArticle(operationName),
+          operationName
+      );
+    }
+  }
+
+  private String statementArticle(String operationName)
+  {
+    return "INSERT".equals(operationName) ? "an" : "a";
+  }
+
+  private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata)
+  {
+    if (tableMetadata == null) {
+      return null;
+    }
+    List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
+    if (CollectionUtils.isNullOrEmpty(keyCols)) {
+      return null;
+    }
+    SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
+    for (ClusterKeySpec keyCol : keyCols) {
+      SqlIdentifier colIdent = new SqlIdentifier(
+          Collections.singletonList(keyCol.expr()),
+          null, SqlParserPos.ZERO,
+          Collections.singletonList(SqlParserPos.ZERO)
+          );
+      SqlNode keyNode;
+      if (keyCol.desc()) {
+        keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, colIdent);
+      } else {
+        keyNode = colIdent;
+      }
+      keyNodes.add(keyNode);
+    }
+    return keyNodes;
+  }
+
+  /**
+   * Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as
+   * an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation
+   * applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back
+   * out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not
+   * actually specify (it is an error to do so.) However, with the current hybrid structure, it is
+   * not possible to add the ORDER by later: doing so requires access to the order by namespace
+   * which is not visible to subclasses.
+   */
+  private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, SqlNodeList catalogClustering)
+  {
+    SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+    if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
+      if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
+        return;
+      }
+      clusteredBy = catalogClustering;
+    }
+    while (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    SqlSelect select = (SqlSelect) source;
+
+    // This part is a bit sad. By the time we get here, the validator will have created
+    // the ORDER BY namespace if we had a real ORDER BY. We have to "catch up" and do the
+    // work that registerQuery() should have done. That's kind of OK. But, the orderScopes
+    // variable is private, so we have to play dirty tricks to get at it.
+    select.setOrderBy(clusteredBy);
+    OrderByScope orderScope = ValidatorShim.newOrderByScope(scopes.get(select), clusteredBy, select);
+    try {
+      @SuppressWarnings("unchecked")
+      Map<SqlSelect, SqlValidatorScope> orderScopes = (Map<SqlSelect, SqlValidatorScope>) FieldUtils.getDeclaredField(SqlValidatorImpl.class, "orderScopes", true).get(this);
+      orderScopes.put(select, orderScope);
+    }
+    catch (Exception e) {
+      throw new ISE(e, "orderScopes is not accessible");
+    }
+  }
+
+  private void validateTimeColumn(RelRecordType sourceType, int timeColumnIndex)
+  {
+    RelDataTypeField timeCol = sourceType.getFieldList().get(timeColumnIndex);
+    RelDataType timeColType = timeCol.getType();
+    if (timeColType instanceof BasicSqlType) {
+      BasicSqlType timeColSqlType = (BasicSqlType) timeColType;
+      SqlTypeName timeColSqlTypeName = timeColSqlType.getSqlTypeName();
+      if (timeColSqlTypeName == SqlTypeName.BIGINT || timeColSqlTypeName == SqlTypeName.TIMESTAMP) {
+        return;
+      }
+    }
+    throw new IAE(
+        "Invalid %s column type %s: must be BIGINT or TIMESTAMP",
+        timeCol.getName(),
+        timeColType.toString()
+    );
+  }
+
+  /**
+   * Verify clustering which can come from the query, the catalog or both. If both,
+   * the two must match. In either case, the cluster keys must be present in the SELECT
+   * clause. The {@code __time} column cannot be included.
+   */
+  private void validateClustering(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final DruidSqlIngest ingestNode,
+      final SqlNodeList catalogClustering
+  )
+  {
+    final SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+
+    // Validate both the catalog and query definitions if present. This ensures
+    // that things are sane if we later check that the two are identical.
+    if (clusteredBy != null) {
+      validateClusteredBy(sourceType, timeColumnIndex, clusteredBy);
+    }
+    if (catalogClustering != null) {
+      // Catalog defines the key columns. Verify that they are present in the query.
+      validateClusteredBy(sourceType, timeColumnIndex, catalogClustering);
+    }
+    if (clusteredBy != null && catalogClustering != null) {
+      // Both the query and catalog have keys.
+      verifyQueryClusterByMatchesCatalog(sourceType, catalogClustering, clusteredBy);
+    }
+  }
+
+  /**
+   * Validate the CLUSTERED BY list. Members can be any of the following:
+   * <p>
+   * {@code CLUSTERED BY [<ordinal> | <id> | <expr>] DESC?}
+   * <p>
+   * Ensure that each id exists. Ensure each column is included only once.
+   * For an expression, just ensure it is valid; we don't check for duplicates.
+   * <p>
+   * Once the keys are validated, update the underlying {@code SELECT} statement
+   * to include the {@code CLUSTERED BY} as the {@code ORDER BY} clause.
+   */
+  private void validateClusteredBy(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final SqlNodeList clusteredBy
+  )
+  {
+    // Keep track of fields which have been referenced.
+    final List<String> fieldNames = sourceType.getFieldNames();
+    final int fieldCount = fieldNames.size();
+    final boolean[] refs = new boolean[fieldCount];
+
+    // Process cluster keys
+    for (SqlNode clusterKey : clusteredBy) {
+      Pair<Integer, Boolean> key = resolveClusterKey(clusterKey, fieldNames);
+      // If an expression, index is null. Validation was done in the ORDER BY check.
+      // Else, do additional MSQ-specific checks.
+      if (key != null) {
+        int index = key.left;
+        // Can't cluster by __time
+        if (index == timeColumnIndex) {
+          throw new IAE("Do not include %s in the CLUSTERED BY clause: it is managed by PARTITIONED BY", Columns.TIME_COLUMN);
+        }
+        // No duplicate references
+        if (refs[index]) {
+          throw new IAE("Duplicate CLUSTERED BY key: %s", clusterKey);
+        }
+        refs[index] = true;
+      }
+    }
+  }
+
+  private Pair<Integer, Boolean> resolveClusterKey(SqlNode clusterKey, final List<String> fieldNames)
+  {
+    boolean desc = false;
+
+    // Check if the key is compound: only occurs for DESC. The ASC
+    // case is abstracted away by the parser.
+    if (clusterKey instanceof SqlBasicCall) {
+      SqlBasicCall basicCall = (SqlBasicCall) clusterKey;
+      if (basicCall.getOperator() == SqlStdOperatorTable.DESC) {
+        // Cluster key is compound: CLUSTERED BY foo DESC
+        // We check only the first element
+        clusterKey = ((SqlBasicCall) clusterKey).getOperandList().get(0);
+        desc = true;
+      }
+    }
+
+    // We now have the actual key. Handle the three cases.
+    if (clusterKey instanceof SqlNumericLiteral) {
+      // Key is an ordinal: CLUSTERED BY 2
+      // Ordinals are 1-based.
+      int ord = ((SqlNumericLiteral) clusterKey).intValue(true);
+      int index = ord - 1;
+
+      // The ordinal has to be in range.
+      if (index < 0 || fieldNames.size() <= index) {
+        throw new IAE("CLUSTERED BY ordinal %d is not valid", ord);
+      }
+      return new Pair<>(index, desc);
+    } else if (clusterKey instanceof SqlIdentifier) {
+      // Key is an identifier: CLUSTERED BY foo
+      SqlIdentifier key = (SqlIdentifier) clusterKey;
+
+      // Only key of the form foo are allowed, not foo.bar
+      if (!key.isSimple()) {
+        throw new IAE("CLUSTERED BY keys must be a simple name: '%s'", key.toString());
+      }
+
+      // The name must match an item in the select list
+      String keyName = key.names.get(0);
+      // Slow linear search. We assume that there are not many cluster keys.
+      int index = fieldNames.indexOf(keyName);
+      if (index == -1) {
+        throw new IAE("CLUSTERED BY key column '%s' is not valid", keyName);
+      }
+      return new Pair<>(index, desc);
+    } else {
+      // Key is an expression: CLUSTERED BY CEIL(m2)
+      return null;
+    }
+  }
+
+  /**
+   * Both the catalog and query define clustering. This is allowed as long as they
+   * are identical.
+   */
+  private void verifyQueryClusterByMatchesCatalog(
+      final RelRecordType sourceType,
+      final SqlNodeList catalogClustering,
+      final SqlNodeList clusteredBy
+  )
+  {
+    if (clusteredBy.size() != catalogClustering.size()) {
+      throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+    }
+    List<String> fieldNames = sourceType.getFieldNames();
+    for (int i = 0; i < clusteredBy.size(); i++) {
+      SqlNode catalogKey = catalogClustering.get(i);
+      SqlNode clusterKey = clusteredBy.get(i);
+      Pair<Integer, Boolean> catalogPair = resolveClusterKey(catalogKey, fieldNames);
+      Pair<Integer, Boolean> queryPair = resolveClusterKey(clusterKey, fieldNames);
+
+      // Cluster keys in the catalog must be field references. If unresolved,
+      // we would have gotten an error above. Here we make sure that both
+      // indexes are the same. Since the catalog index can't be null, we're
+      // essentially checking that the indexes are the same: they name the same
+      // column.
+      if (!Objects.equals(catalogPair, queryPair)) {
+        throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+      }
+    }
+  }
+
+  private RuntimeException clusterKeyMismatchException(SqlNodeList catalogClustering, SqlNodeList clusterKeys)
+  {
+    throw new IAE(
+        "CLUSTER BY mismatch. Catalog: [%s], query: [%s]",
+        catalogClustering,
+        clusterKeys
+    );
+  }
+
+  /**
+   * Compute and validate the target type. In normal SQL, the engine would insert
+   * a project operator after the SELECT before the write to cast columns from the
+   * input type to the (compatible) defined output type. Druid doesn't work that way.
+   * In MSQ, the output the just is the input type. If the user wants to control the
+   * output type, then the user must manually insert any required CAST: Druid is not
+   * in the business of changing the type to suit the catalog.
+   * <p>
+   * As a result, we first propagate column names and types using Druid rules: the
+   * output is exactly what SELECT says it is. We then apply restrictions from the
+   * catalog. If the table is strict, only column names from the catalog can be
+   * used.
+   */
+  private RelDataType validateTargetType(SqlInsert insert, RelRecordType sourceType, DatasourceFacade tableMetadata)
+  {
+    final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
+    for (RelDataTypeField sourceField : sourceFields) {
+      String colName = sourceField.getName();
+      // Check that there are no unnamed columns in the insert.
+      if (UNNAMED_COLUMN_PATTERN.matcher(colName).matches()) {
+        throw new IAE("Expressions must provide an alias to specify the target column: func(X) AS myColumn");
+      }
+    }
+    if (tableMetadata == null) {
+      return sourceType;
+    }
+    final boolean isStrict = tableMetadata.isSealed();
+    final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
+    for (RelDataTypeField sourceField : sourceFields) {
+      String colName = sourceField.getName();
+      ColumnFacade definedCol = tableMetadata.column(colName);
+      if (definedCol == null) {
+        // No catalog definition for this column.
+        if (isStrict) {
+          // Table is strict: cannot add new columns at ingest time.
+          throw new IAE(
+              "Target column \"%s\".\"%s\" is not defined",
+              insert.getTargetTable(),
+              colName
+          );
+        }
+
+        // Table is not strict: add a new column based on the SELECT column.
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // If the column name is defined, but no type is given then, use the
+      // column type from SELECT.
+      if (!definedCol.hasType()) {
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // Both the column name and type are provided. Use the name and type
+      // from the catalog.
+      // TODO: Handle error if type not found
+      String sqlTypeName = definedCol.sqlStorageType();
+      if (sqlTypeName == null) {
+        // Don't know the storage type. Just skip this one: Druid types are
+        // fluid so let Druid sort out what to store.
+        fields.add(Pair.of(colName, sourceField.getType()));

Review Comment:
   This are is a ~~TODO~~ bit of advice for future readers. The catalog has defined a type that doesn't have a Druid storage type. Can that happen? That is a ~~TODO~~ unresolved issue. Rather than work out the entire type system now (aggregates, arrays, JSON, complex types defined in extensions), we support the basic types so we can get the framework in, then leave a ~~TODO~~ comment for future readers about where to add code later as we complete the type system.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {

Review Comment:
   Disallowed `WEEK` granularity in MSQ. There is no way to use the `Duration` granularity in MSQ, so `WEEK` is completely disallowed. Neither `'P1W'` nor `'P7D'` are accepted either.



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java:
##########
@@ -19,24 +19,701 @@
 
 package org.apache.druid.sql.calcite.planner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.prepare.BaseDruidSqlValidator;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.IdentifierNamespace;
+import org.apache.calcite.sql.validate.OrderByScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.ValidatorShim;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.facade.DatasourceFacade;
+import org.apache.druid.catalog.model.facade.DatasourceFacade.ColumnFacade;
+import org.apache.druid.catalog.model.table.ClusterKeySpec;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.table.DatasourceTable;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
 
 /**
- * Druid extended SQL validator. (At present, it doesn't actually
- * have any extensions yet, but it will soon.)
+ * Druid extended SQL validator.
  */
 class DruidSqlValidator extends BaseDruidSqlValidator
 {
+  private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+
+  // Copied here from MSQE since that extension is not visible here.
+  public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
+
+  public interface ValidatorContext
+  {
+    Map<String, Object> queryContextMap();
+    CatalogResolver catalog();
+    String druidSchemaName();
+    ObjectMapper jsonMapper();
+  }
+
+  private final ValidatorContext validatorContext;
+
   protected DruidSqlValidator(
-      SqlOperatorTable opTab,
-      CalciteCatalogReader catalogReader,
-      JavaTypeFactory typeFactory,
-      SqlConformance conformance)
+      final SqlOperatorTable opTab,
+      final CalciteCatalogReader catalogReader,
+      final JavaTypeFactory typeFactory,
+      final SqlConformance conformance,
+      final ValidatorContext validatorContext
+  )
   {
     super(opTab, catalogReader, typeFactory, conformance);
+    this.validatorContext = validatorContext;
+  }
+
+  /**
+   * Druid-specific validation for an INSERT statement. In Druid, the columns are
+   * matched by name. A datasource, by default, allows the insertion of arbitrary columns,
+   * but the catalog may enforce a strict schema (all columns must exist). Destination
+   * types are set by the catalog, where available, else by the query.
+   * <p>
+   * The Druid {@code INSERT} statement is non-standard in a variety of ways:
+   * <ul>
+   * <li>Allows the target table to not yet exist. Instead, {@code INSERT}
+   * creates it.</li>
+   * <li>Does not allow specifying the list of columns:
+   * {@code INSERT INTO dst (a, b, c) ...}</li>
+   * <li>When given without target columns (the only form allowed), columns are
+   * not matched by schema position as in standard SQL, but rather by name.</li>
+   * <li>There is no requirement that the target columns already exist. In fact,
+   * even if the target column exists, any existing type is ignored if not specified
+   * in the catalog.</li>
+   * <li>The source can only be a {@code SELECT} statement, not {@code VALUES}.</li>
+   * <li>Types thus propagate upwards from the {@code SELECT} to the the target
+   * table. Standard SQL says that types propagate down from the target table to the
+   * source.</li>
+   * <li>The __time column is special in multiple ways.</li>
+   * <li>Includes the {@code CLUSTERED BY} and {@code PARTITIONED BY} clauses.</li>
+   * </ul>
+   * The result is that the validation for the Druid {@code INSERT} is wildly customized
+   * relative to standard SQL.
+   */
+  // TODO: Ensure the source and target are not the same
+  @Override
+  public void validateInsert(SqlInsert insert)
+  {
+    DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
+    if (insert.isUpsert()) {
+      throw new IAE("UPSERT is not supported.");
+    }
+
+    // SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
+    String operationName = insert.getOperator().getName();
+    if (insert.getTargetColumnList() != null) {
+      throw new IAE("%s with a target column list is not supported.", operationName);
+    }
+
+    // The target namespace is both the target table ID and the row type for that table.
+    final SqlValidatorNamespace targetNamespace = getNamespace(insert);
+    IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
+
+    // The target is a new or existing datasource.
+    DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
+
+    // An existing datasource may have metadata.
+    DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
+
+    // Validate segment granularity, which depends on nothing else.
+    validateSegmentGranularity(operationName, ingestNode, tableMetadata);
+
+    // The source must be a SELECT
+    SqlNode source = insert.getSource();
+    validateInsertSelect(source, operationName);
+
+    // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
+    SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
+    rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);
+
+    // Validate the source statement. Validates the ORDER BY pushed down in the above step.
+    // Because of the odd Druid semantics, we can't define the target type: we don't know
+    // the target columns yet, and we can't infer types when they must come from the SELECT.
+    // Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
+    // usually defines the target types, unless the catalog says otherwise. Since catalog entries
+    // are optional, we don't know the target type until we validate the SELECT. (Also, we won't
+    // know names and we match by name.) Thus, we'd have to validate (to know names and types)
+    // to get the target types, but we need the target types to validate. Catch-22. So, we punt.
+    if (source instanceof SqlSelect) {
+      final SqlSelect sqlSelect = (SqlSelect) source;
+      validateSelect(sqlSelect, unknownType);
+    } else {
+      final SqlValidatorScope scope = scopes.get(source);
+      validateQuery(source, scope, unknownType);
+    }
+
+    SqlValidatorNamespace sourceNamespace = namespaces.get(source);
+    RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
+
+    // Validate the __time column
+    int timeColumnIndex = sourceType.getFieldNames().indexOf(Columns.TIME_COLUMN);
+    if (timeColumnIndex != -1) {
+      validateTimeColumn(sourceType, timeColumnIndex);
+    }
+
+    // Validate clustering against the SELECT row type. Clustering has additional
+    // constraints beyond what was validated for the pushed-down ORDER BY.
+    // Though we pushed down clustering above, only now can we validate it after
+    // we've determined the SELECT row type.
+    validateClustering(sourceType, timeColumnIndex, ingestNode, catalogClustering);
+
+    // Determine the output (target) schema.
+    RelDataType targetType = validateTargetType(insert, sourceType, tableMetadata);
+
+    // Set the type for the INSERT/REPLACE node
+    setValidatedNodeType(insert, targetType);
+
+    // Segment size
+    if (tableMetadata != null) {
+      Integer targetSegmentRows = tableMetadata.targetSegmentRows();
+      if (targetSegmentRows != null) {
+        validatorContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
+      }
+    }
+  }
+
+  /**
+   * Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
+   * or insert into an existing one. If the target exists, it must be a datasource. If it
+   * does not exist, the target must be in the datasource schema, normally "druid".
+   */
+  private DatasourceTable validateInsertTarget(
+      final SqlValidatorNamespace targetNamespace,
+      final IdentifierNamespace insertNs,
+      final String operationName
+  )
+  {
+    // Get the target table ID
+    SqlIdentifier destId = insertNs.getId();
+    if (destId.names.isEmpty()) {
+      // I don't think this can happen, but include a branch for it just in case.
+      throw new IAE("%s requires a target table.", operationName);
+    }
+
+    // Druid does not support 3+ part names.
+    int n = destId.names.size();
+    if (n > 2) {
+      throw new IAE("Table name is undefined: %s", destId.toString());
+    }
+    String tableName = destId.names.get(n - 1);
+
+    // If this is a 2-part name, the first part must be the datasource schema.
+    if (n == 2 && !validatorContext.druidSchemaName().equals(destId.names.get(0))) {
+      throw new IAE("Cannot %s into %s because it is not a Druid datasource.",
+          operationName,
+          destId
+      );
+    }
+    try {
+      // Try to resolve the table. Will fail if this is an INSERT into a new table.
+      validateNamespace(targetNamespace, unknownType);
+      SqlValidatorTable target = insertNs.resolve().getTable();
+      try {
+        return target.unwrap(DatasourceTable.class);
+      }
+      catch (Exception e) {
+        throw new IAE("Cannot %s into %s: it is not a datasource", operationName, destId);
+      }
+    }
+    catch (CalciteContextException e) {
+      // Something failed. Let's make sure it was the table lookup.
+      // The check is kind of a hack, but its the best we can do given that Calcite
+      // didn't expect this non-SQL use case.
+      if (e.getCause() instanceof SqlValidatorException && e.getMessage().contains(StringUtils.format("Object '%s' not found", tableName))) {
+        // The catalog implementation may be "strict": and require that the target
+        // table already exists, rather than the default "lenient" mode that can
+        // create a new table.
+        if (validatorContext.catalog().ingestRequiresExistingTable()) {
+          throw new IAE("Cannot %s into %s because it does not exist", operationName, destId);
+        }
+        // New table. Validate the shape of the name.
+        IdUtils.validateId(operationName + " dataSource", tableName);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private void validateSegmentGranularity(
+      final String operationName,
+      final DruidSqlIngest ingestNode,
+      final DatasourceFacade tableMetadata
+  )
+  {
+    final Granularity definedGranularity = tableMetadata == null ? null : tableMetadata.segmentGranularity();
+    if (definedGranularity != null) {
+      // Should already have been checked when creating the catalog entry
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(definedGranularity);
+    }
+    final Granularity ingestionGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(ingestNode.getPartitionedBy());
+    if (ingestionGranularity != null) {
+      DruidSqlParserUtils.throwIfUnsupportedGranularityInPartitionedBy(ingestionGranularity);
+    }
+    final Granularity finalGranularity;
+    if (definedGranularity == null && ingestionGranularity == null) {
+      // Neither have a value: error
+      throw new IAE(
+          "%s statements must specify a PARTITIONED BY clause explicitly",
+          operationName
+      );
+    } else if (ingestionGranularity == null) {
+      // The query has no granularity: just apply the catalog granularity.
+      finalGranularity = definedGranularity;
+    } else if (definedGranularity == null) {
+      // The catalog has no granularity: apply the query value
+      finalGranularity = ingestionGranularity;
+    } else if (definedGranularity.equals(ingestionGranularity)) {
+      // Both have a setting. They have to be the same.
+      finalGranularity = definedGranularity;
+    } else {
+      throw new IAE(
+          "PARTITIONED BY mismatch. Catalog: [%s], query: [%s]",
+          granularityToSqlString(definedGranularity),
+          granularityToSqlString(ingestionGranularity)
+      );
+    }
+
+    // Note: though this is the validator, we cheat a bit and write the target
+    // granularity into the query context. Perhaps this step should be done
+    // during conversion, however, we've just worked out the granularity, so we
+    // do it here instead.
+    try {
+      validatorContext.queryContextMap().put(
+          DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+          validatorContext.jsonMapper().writeValueAsString(finalGranularity)
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new IAE("Invalid PARTITIONED BY granularity");
+    }
+  }
+
+  private String granularityToSqlString(Granularity gran)
+  {
+    if (gran == null) {
+      return "NULL";
+    }
+    if (Granularities.ALL == gran) {
+      return "ALL TIME";
+    }
+    if (!(gran instanceof PeriodGranularity)) {
+      return gran.toString();
+    }
+    return ((PeriodGranularity) gran).getPeriod().toString();
+  }
+
+  private void validateInsertSelect(
+      SqlNode source,
+      final String operationName
+  )
+  {
+    // The source SELECT cannot include an ORDER BY clause. Ordering is given
+    // by the CLUSTERED BY clause, if any.
+    // Check that an ORDER BY clause is not provided by the underlying query
+    SqlNodeList orderByList;
+    if (source instanceof SqlOrderBy) {
+      throw new IAE(
+          "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+          statementArticle(operationName),
+          operationName
+      );
+    }
+
+    if (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    if (source instanceof SqlSelect) {
+      SqlSelect select = (SqlSelect) source;
+      orderByList = select.getOrderList();
+      if (orderByList != null && orderByList.size() != 0) {
+        throw new IAE(
+            "ORDER BY is not supported within %s %s statement, use CLUSTERED BY instead.",
+            statementArticle(operationName),
+            operationName
+        );
+      }
+    } else {
+      throw new IAE(
+          "%s is not supported within %s %s statement.",
+          source.getKind(),
+          statementArticle(operationName),
+          operationName
+      );
+    }
+  }
+
+  private String statementArticle(String operationName)
+  {
+    return "INSERT".equals(operationName) ? "an" : "a";
+  }
+
+  private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata)
+  {
+    if (tableMetadata == null) {
+      return null;
+    }
+    List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
+    if (CollectionUtils.isNullOrEmpty(keyCols)) {
+      return null;
+    }
+    SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
+    for (ClusterKeySpec keyCol : keyCols) {
+      SqlIdentifier colIdent = new SqlIdentifier(
+          Collections.singletonList(keyCol.expr()),
+          null, SqlParserPos.ZERO,
+          Collections.singletonList(SqlParserPos.ZERO)
+          );
+      SqlNode keyNode;
+      if (keyCol.desc()) {
+        keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, colIdent);
+      } else {
+        keyNode = colIdent;
+      }
+      keyNodes.add(keyNode);
+    }
+    return keyNodes;
+  }
+
+  /**
+   * Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as
+   * an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation
+   * applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back
+   * out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not
+   * actually specify (it is an error to do so.) However, with the current hybrid structure, it is
+   * not possible to add the ORDER by later: doing so requires access to the order by namespace
+   * which is not visible to subclasses.
+   */
+  private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, SqlNodeList catalogClustering)
+  {
+    SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+    if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
+      if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
+        return;
+      }
+      clusteredBy = catalogClustering;
+    }
+    while (source instanceof SqlWith) {
+      source = ((SqlWith) source).getOperandList().get(1);
+    }
+    SqlSelect select = (SqlSelect) source;
+
+    // This part is a bit sad. By the time we get here, the validator will have created
+    // the ORDER BY namespace if we had a real ORDER BY. We have to "catch up" and do the
+    // work that registerQuery() should have done. That's kind of OK. But, the orderScopes
+    // variable is private, so we have to play dirty tricks to get at it.
+    select.setOrderBy(clusteredBy);
+    OrderByScope orderScope = ValidatorShim.newOrderByScope(scopes.get(select), clusteredBy, select);
+    try {
+      @SuppressWarnings("unchecked")
+      Map<SqlSelect, SqlValidatorScope> orderScopes = (Map<SqlSelect, SqlValidatorScope>) FieldUtils.getDeclaredField(SqlValidatorImpl.class, "orderScopes", true).get(this);
+      orderScopes.put(select, orderScope);
+    }
+    catch (Exception e) {
+      throw new ISE(e, "orderScopes is not accessible");
+    }
+  }
+
+  private void validateTimeColumn(RelRecordType sourceType, int timeColumnIndex)
+  {
+    RelDataTypeField timeCol = sourceType.getFieldList().get(timeColumnIndex);
+    RelDataType timeColType = timeCol.getType();
+    if (timeColType instanceof BasicSqlType) {
+      BasicSqlType timeColSqlType = (BasicSqlType) timeColType;
+      SqlTypeName timeColSqlTypeName = timeColSqlType.getSqlTypeName();
+      if (timeColSqlTypeName == SqlTypeName.BIGINT || timeColSqlTypeName == SqlTypeName.TIMESTAMP) {
+        return;
+      }
+    }
+    throw new IAE(
+        "Invalid %s column type %s: must be BIGINT or TIMESTAMP",
+        timeCol.getName(),
+        timeColType.toString()
+    );
+  }
+
+  /**
+   * Verify clustering which can come from the query, the catalog or both. If both,
+   * the two must match. In either case, the cluster keys must be present in the SELECT
+   * clause. The {@code __time} column cannot be included.
+   */
+  private void validateClustering(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final DruidSqlIngest ingestNode,
+      final SqlNodeList catalogClustering
+  )
+  {
+    final SqlNodeList clusteredBy = ingestNode.getClusteredBy();
+
+    // Validate both the catalog and query definitions if present. This ensures
+    // that things are sane if we later check that the two are identical.
+    if (clusteredBy != null) {
+      validateClusteredBy(sourceType, timeColumnIndex, clusteredBy);
+    }
+    if (catalogClustering != null) {
+      // Catalog defines the key columns. Verify that they are present in the query.
+      validateClusteredBy(sourceType, timeColumnIndex, catalogClustering);
+    }
+    if (clusteredBy != null && catalogClustering != null) {
+      // Both the query and catalog have keys.
+      verifyQueryClusterByMatchesCatalog(sourceType, catalogClustering, clusteredBy);
+    }
+  }
+
+  /**
+   * Validate the CLUSTERED BY list. Members can be any of the following:
+   * <p>
+   * {@code CLUSTERED BY [<ordinal> | <id> | <expr>] DESC?}
+   * <p>
+   * Ensure that each id exists. Ensure each column is included only once.
+   * For an expression, just ensure it is valid; we don't check for duplicates.
+   * <p>
+   * Once the keys are validated, update the underlying {@code SELECT} statement
+   * to include the {@code CLUSTERED BY} as the {@code ORDER BY} clause.
+   */
+  private void validateClusteredBy(
+      final RelRecordType sourceType,
+      final int timeColumnIndex,
+      final SqlNodeList clusteredBy
+  )
+  {
+    // Keep track of fields which have been referenced.
+    final List<String> fieldNames = sourceType.getFieldNames();
+    final int fieldCount = fieldNames.size();
+    final boolean[] refs = new boolean[fieldCount];
+
+    // Process cluster keys
+    for (SqlNode clusterKey : clusteredBy) {
+      Pair<Integer, Boolean> key = resolveClusterKey(clusterKey, fieldNames);
+      // If an expression, index is null. Validation was done in the ORDER BY check.
+      // Else, do additional MSQ-specific checks.
+      if (key != null) {
+        int index = key.left;
+        // Can't cluster by __time
+        if (index == timeColumnIndex) {
+          throw new IAE("Do not include %s in the CLUSTERED BY clause: it is managed by PARTITIONED BY", Columns.TIME_COLUMN);
+        }
+        // No duplicate references
+        if (refs[index]) {
+          throw new IAE("Duplicate CLUSTERED BY key: %s", clusterKey);
+        }
+        refs[index] = true;
+      }
+    }
+  }
+
+  private Pair<Integer, Boolean> resolveClusterKey(SqlNode clusterKey, final List<String> fieldNames)
+  {
+    boolean desc = false;
+
+    // Check if the key is compound: only occurs for DESC. The ASC
+    // case is abstracted away by the parser.
+    if (clusterKey instanceof SqlBasicCall) {
+      SqlBasicCall basicCall = (SqlBasicCall) clusterKey;
+      if (basicCall.getOperator() == SqlStdOperatorTable.DESC) {
+        // Cluster key is compound: CLUSTERED BY foo DESC
+        // We check only the first element
+        clusterKey = ((SqlBasicCall) clusterKey).getOperandList().get(0);
+        desc = true;
+      }
+    }
+
+    // We now have the actual key. Handle the three cases.
+    if (clusterKey instanceof SqlNumericLiteral) {
+      // Key is an ordinal: CLUSTERED BY 2
+      // Ordinals are 1-based.
+      int ord = ((SqlNumericLiteral) clusterKey).intValue(true);
+      int index = ord - 1;
+
+      // The ordinal has to be in range.
+      if (index < 0 || fieldNames.size() <= index) {
+        throw new IAE("CLUSTERED BY ordinal %d is not valid", ord);
+      }
+      return new Pair<>(index, desc);
+    } else if (clusterKey instanceof SqlIdentifier) {
+      // Key is an identifier: CLUSTERED BY foo
+      SqlIdentifier key = (SqlIdentifier) clusterKey;
+
+      // Only key of the form foo are allowed, not foo.bar
+      if (!key.isSimple()) {
+        throw new IAE("CLUSTERED BY keys must be a simple name: '%s'", key.toString());
+      }
+
+      // The name must match an item in the select list
+      String keyName = key.names.get(0);
+      // Slow linear search. We assume that there are not many cluster keys.
+      int index = fieldNames.indexOf(keyName);
+      if (index == -1) {
+        throw new IAE("CLUSTERED BY key column '%s' is not valid", keyName);
+      }
+      return new Pair<>(index, desc);
+    } else {
+      // Key is an expression: CLUSTERED BY CEIL(m2)
+      return null;
+    }
+  }
+
+  /**
+   * Both the catalog and query define clustering. This is allowed as long as they
+   * are identical.
+   */
+  private void verifyQueryClusterByMatchesCatalog(
+      final RelRecordType sourceType,
+      final SqlNodeList catalogClustering,
+      final SqlNodeList clusteredBy
+  )
+  {
+    if (clusteredBy.size() != catalogClustering.size()) {
+      throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+    }
+    List<String> fieldNames = sourceType.getFieldNames();
+    for (int i = 0; i < clusteredBy.size(); i++) {
+      SqlNode catalogKey = catalogClustering.get(i);
+      SqlNode clusterKey = clusteredBy.get(i);
+      Pair<Integer, Boolean> catalogPair = resolveClusterKey(catalogKey, fieldNames);
+      Pair<Integer, Boolean> queryPair = resolveClusterKey(clusterKey, fieldNames);
+
+      // Cluster keys in the catalog must be field references. If unresolved,
+      // we would have gotten an error above. Here we make sure that both
+      // indexes are the same. Since the catalog index can't be null, we're
+      // essentially checking that the indexes are the same: they name the same
+      // column.
+      if (!Objects.equals(catalogPair, queryPair)) {
+        throw clusterKeyMismatchException(catalogClustering, clusteredBy);
+      }
+    }
+  }
+
+  private RuntimeException clusterKeyMismatchException(SqlNodeList catalogClustering, SqlNodeList clusterKeys)
+  {
+    throw new IAE(
+        "CLUSTER BY mismatch. Catalog: [%s], query: [%s]",
+        catalogClustering,
+        clusterKeys
+    );
+  }
+
+  /**
+   * Compute and validate the target type. In normal SQL, the engine would insert
+   * a project operator after the SELECT before the write to cast columns from the
+   * input type to the (compatible) defined output type. Druid doesn't work that way.
+   * In MSQ, the output the just is the input type. If the user wants to control the
+   * output type, then the user must manually insert any required CAST: Druid is not
+   * in the business of changing the type to suit the catalog.
+   * <p>
+   * As a result, we first propagate column names and types using Druid rules: the
+   * output is exactly what SELECT says it is. We then apply restrictions from the
+   * catalog. If the table is strict, only column names from the catalog can be
+   * used.
+   */
+  private RelDataType validateTargetType(SqlInsert insert, RelRecordType sourceType, DatasourceFacade tableMetadata)
+  {
+    final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
+    for (RelDataTypeField sourceField : sourceFields) {
+      String colName = sourceField.getName();
+      // Check that there are no unnamed columns in the insert.
+      if (UNNAMED_COLUMN_PATTERN.matcher(colName).matches()) {
+        throw new IAE("Expressions must provide an alias to specify the target column: func(X) AS myColumn");
+      }
+    }
+    if (tableMetadata == null) {
+      return sourceType;
+    }
+    final boolean isStrict = tableMetadata.isSealed();
+    final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
+    for (RelDataTypeField sourceField : sourceFields) {
+      String colName = sourceField.getName();
+      ColumnFacade definedCol = tableMetadata.column(colName);
+      if (definedCol == null) {
+        // No catalog definition for this column.
+        if (isStrict) {
+          // Table is strict: cannot add new columns at ingest time.
+          throw new IAE(
+              "Target column \"%s\".\"%s\" is not defined",
+              insert.getTargetTable(),
+              colName
+          );
+        }
+
+        // Table is not strict: add a new column based on the SELECT column.
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // If the column name is defined, but no type is given then, use the
+      // column type from SELECT.
+      if (!definedCol.hasType()) {
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+
+      // Both the column name and type are provided. Use the name and type
+      // from the catalog.
+      // TODO: Handle error if type not found
+      String sqlTypeName = definedCol.sqlStorageType();
+      if (sqlTypeName == null) {
+        // Don't know the storage type. Just skip this one: Druid types are
+        // fluid so let Druid sort out what to store.
+        fields.add(Pair.of(colName, sourceField.getType()));
+        continue;
+      }
+      SqlTypeName sqlType = SqlTypeName.get(sqlTypeName);

Review Comment:
   We need a `RelDataType` object, not a `String`. To do that, we have to use the type factory to create one. The type factory needs an instance of `SqlTypeName`.
   
   We're doing all this to build up the target schema, just as if we got it from an RDBMS catalog. We then ask Calcite to tell us if the source columns types are convertible to the target column types, as if this was a classic SQL `INSERT`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org