You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/14 17:02:47 UTC

[GitHub] [druid] rohangarg commented on a diff in pull request #13165: Druid Catalog basics

rohangarg commented on code in PR #13165:
URL: https://github.com/apache/druid/pull/13165#discussion_r991085378


##########
processing/src/main/java/org/apache/druid/segment/column/RowSignature.java:
##########
@@ -32,6 +32,7 @@
 import org.apache.druid.segment.ColumnInspector;
 
 import javax.annotation.Nullable;
+

Review Comment:
   nit: unnecessary change



##########
server/pom.xml:
##########
@@ -313,18 +313,17 @@
             <groupId>com.fasterxml.jackson.module</groupId>
             <artifactId>jackson-module-guice</artifactId>
         </dependency>
+    	<dependency>
+      		<groupId>org.apache.commons</groupId>

Review Comment:
   nit: check formatting



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {

Review Comment:
   nit: could be transformed into a lambda



##########
server/src/main/java/org/apache/druid/catalog/model/ObjectDefn.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.catalog.model.Properties.PropertyDefn;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Metadata definition of the metadata objects stored in the catalog. (Yes,
+ * that means that this is meta-meta-data.) Objects consist of a map of
+ * property values (and perhaps other items defined in subclasses.) Each
+ * property is defined by a column metadata object. Objects allow extended
+ * properties which have no definition: the meaning of such properties is
+ * defined elsewhere.
+ */
+public class ObjectDefn
+{
+  private final String name;
+  private final String typeValue;
+  private final Map<String, PropertyDefn> properties;
+
+  public ObjectDefn(
+      final String name,
+      final String typeValue,
+      final List<PropertyDefn> fields
+  )
+  {
+    this.name = name;
+    this.typeValue = typeValue;
+    this.properties = toPropertyMap(fields);
+  }
+
+  protected static Map<String, PropertyDefn> toPropertyMap(final List<PropertyDefn> props)
+  {
+    ImmutableMap.Builder<String, PropertyDefn> builder = ImmutableMap.builder();
+    if (props != null) {
+      for (PropertyDefn prop : props) {
+        builder.put(prop.name(), prop);
+      }
+    }
+    return builder.build();
+  }
+
+  public String name()
+  {
+    return name;
+  }
+
+  /**
+   * The type value is the value of the {@code "type"} field written into the
+   * object's Java or JSON representation. It is akin to the type used by
+   * Jackson.
+   */
+  public String typeValue()
+  {
+    return typeValue;
+  }
+
+  public Map<String, PropertyDefn> properties()
+  {
+    return properties;
+  }
+
+  public PropertyDefn property(String key)
+  {
+    return properties.get(key);
+  }
+
+  /**
+   * Merge the properties for an object using a set of updates in a map. If the
+   * update value is null, then remove the property in the revised set. If the
+   * property is known, use the column definition to merge the values. Else, the
+   * update replaces any existing value.
+   * <p>
+   * This method does not validate the properties, except as needed to do a
+   * merge. A separate validation step is done on the final, merged object.
+   */
+  protected Map<String, Object> mergeProperties(
+      final Map<String, Object> source,
+      final Map<String, Object> update
+  )
+  {
+    if (update == null) {
+      return source;
+    }
+    if (source == null) {
+      return update;
+    }
+    Map<String, Object> merged = new HashMap<>(source);
+    for (Map.Entry<String, Object> entry : update.entrySet()) {
+      if (entry.getValue() == null) {

Review Comment:
   note : null update to a property means removal



##########
server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.java.util.common.IAE;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specification of table columns. Columns have multiple types

Review Comment:
   I didn't understand the multiple types part of the column - can you please elaborate on that? 



##########
server/src/main/java/org/apache/druid/catalog/model/Parameterized.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import org.apache.druid.catalog.model.table.ExternalSpec;
+
+import java.util.List;
+import java.util.Map;
+
+public interface Parameterized

Review Comment:
   should this be `ParameterizedObject` ?



##########
server/src/main/java/org/apache/druid/catalog/model/SchemaRegistryImpl.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.catalog.model.table.ExternalTableDefn;
+import org.apache.druid.server.security.ResourceType;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Hard-coded schema registry that knows about the well-known, and
+ * a few obscure, Druid schemas. Does not allow for user-defined
+ * schemas, which the rest of Druid would not be able to support.
+ */
+public class SchemaRegistryImpl implements SchemaRegistry
+{
+  // Mimics the definition in ExternalOperatorConvertion
+  // TODO: Change this when ExternalOperatorConvertion changes
+  private String EXTERNAL_RESOURCE = "EXTERNAL";
+
+  public static class SchemaDefnImpl implements SchemaSpec
+  {
+    private final String name;
+    private final String resource;
+    private final Set<String> accepts;
+
+    public SchemaDefnImpl(
+        String name,
+        String resource,
+        Set<String> accepts
+    )
+    {
+      this.name = name;
+      this.resource = resource;
+      this.accepts = accepts;
+    }
+
+    @Override
+    public String name()
+    {
+      return name;
+    }
+
+    @Override
+    public String securityResource()
+    {
+      return resource;
+    }
+
+    @Override
+    public boolean writable()
+    {
+      return accepts != null && !accepts.isEmpty();
+    }
+
+    @Override
+    public boolean accepts(String tableType)
+    {
+      if (accepts == null) {
+        return false;
+      }
+      return accepts.contains(tableType);
+    }
+  }
+
+  private final Map<String, SchemaSpec> builtIns;
+
+  public SchemaRegistryImpl()
+  {
+    builtIns = new HashMap<>();
+    register(new SchemaDefnImpl(
+        TableId.DRUID_SCHEMA,

Review Comment:
   the schema names could be moved to this class itself



##########
server/src/main/java/org/apache/druid/catalog/model/TableMetadata.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.guice.annotations.PublicApi;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import java.util.Objects;
+
+/**
+ * REST API level description of a table. Tables have multiple types
+ * as described by subclasses. Stores the operational aspects of a
+ * table, such as its name, creation time, state and spec.
+ *
+ * @see {@link ResolvedTable} for the semantic representation.
+ */
+@PublicApi
+public class TableMetadata
+{
+  public enum TableState
+  {
+    ACTIVE("A"),
+    DELETING("D");
+
+    private final String code;
+
+    TableState(String code)
+    {
+      this.code = code;
+    }
+
+    public String code()
+    {
+      return code;
+    }
+
+    public static TableState fromCode(String code)
+    {
+      for (TableState state : values()) {
+        if (state.code.equals(code)) {
+          return state;
+        }
+      }
+      throw new ISE("Unknown TableState code: " + code);
+    }
+  }
+
+  private final TableId id;
+  private final long creationTime;
+  private final long updateTime;
+  private final TableState state;
+  private final TableSpec spec;

Review Comment:
   should this be either `ResolvedTable` or `TableDefn` instead of `TableSpec`? FWIU, `TableSpec` is meaningless without a definition, but the definition can have a meaning.



##########
server/src/main/java/org/apache/druid/catalog/model/table/ClusterKeySpec.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+public class ClusterKeySpec
+{
+  private final String expr;
+  private final boolean desc;

Review Comment:
   this could probably be an enum, since clustering could be extended beyond sorting as well in future



##########
server/src/main/java/org/apache/druid/catalog/model/table/MeasureTypes.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class MeasureTypes
+{
+  public static final String OPTIONAL = "--";
+
+  public enum BaseType
+  {
+    VARCHAR(ColumnType.STRING),
+    BIGINT(ColumnType.LONG),
+    FLOAT(ColumnType.FLOAT),
+    DOUBLE(ColumnType.DOUBLE);
+
+    public final ColumnType nativeType;
+
+    BaseType(ColumnType nativeType)
+    {
+      this.nativeType = nativeType;
+    }
+  }
+
+  public static class MeasureType
+  {
+    public final String name;
+    public final List<BaseType> argTypes;
+    public final String sqlSeedFn;
+    public final String sqlReducerFn;
+    public final String nativeType;
+    public final String nativeAggFn;
+    public final ColumnType storageType;
+    public final String nativeReducerFn;
+
+    public MeasureType(
+        final String name,
+        final List<BaseType> argTypes,
+        final String sqlSeedFn,
+        final String sqlReducerFn,
+        final String nativeType,
+        final ColumnType storageType,
+        final String nativeAggFn,
+        final String nativeReducerFn
+    )
+    {
+      this.name = name;
+      this.argTypes = argTypes == null ? Collections.emptyList() : argTypes;
+      this.sqlSeedFn = sqlSeedFn;
+      this.sqlReducerFn = sqlReducerFn;
+      this.nativeType = nativeType;
+      this.storageType = storageType;
+      this.nativeAggFn = nativeAggFn;
+      this.nativeReducerFn = nativeReducerFn;
+    }
+
+    @Override
+    public String toString()
+    {
+      StringBuilder buf = new StringBuilder()
+          .append(name)
+          .append("(");
+      for (int i = 0; i < argTypes.size(); i++) {
+        if (i > 0) {
+          buf.append(", ");
+        }
+        buf.append(argTypes.get(i).name());
+      }
+      return buf.append(")").toString();
+    }
+  }
+
+  // See: https://druid.apache.org/docs/latest/querying/aggregations.html
+  public static final MeasureType COUNT_TYPE = new MeasureType(
+      "COUNT",
+      null,
+      null,
+      "SUM",
+      "longSum",
+      ColumnType.LONG,
+      "count",
+      "longSum"
+  );
+
+  public static final MeasureType SUM_BIGINT_TYPE = simpleAggType("sum", BaseType.BIGINT);
+  public static final MeasureType SUM_FLOAT_TYPE = simpleAggType("sum", BaseType.FLOAT);
+  public static final MeasureType SUM_DOUBLE_TYPE = simpleAggType("sum", BaseType.DOUBLE);
+  public static final MeasureType MIN_BIGINT_TYPE = simpleAggType("min", BaseType.BIGINT);
+  public static final MeasureType MIN_FLOAT_TYPE = simpleAggType("min", BaseType.FLOAT);
+  public static final MeasureType MIN_DOUBLE_TYPE = simpleAggType("min", BaseType.DOUBLE);
+  public static final MeasureType MAX_BIGINT_TYPE = simpleAggType("max", BaseType.BIGINT);
+  public static final MeasureType MAX_FLOAT_TYPE = simpleAggType("max", BaseType.FLOAT);
+  public static final MeasureType MAX_DOUBLE_TYPE = simpleAggType("max", BaseType.DOUBLE);
+
+  private static MeasureType simpleAggType(String fn, BaseType baseType)
+  {
+    String sqlFn = StringUtils.toUpperCase(fn);
+    String nativeFn = baseType.nativeType.asTypeString() + org.apache.commons.lang3.StringUtils.capitalize(fn);
+    return new MeasureType(
+        sqlFn,
+        Collections.singletonList(baseType),
+        sqlFn,
+        null,
+        sqlFn,
+        baseType.nativeType,
+        nativeFn,
+        nativeFn
+    );
+  }
+
+  private static final List<MeasureType> TYPE_LIST =
+      Arrays.asList(
+          COUNT_TYPE,
+          SUM_BIGINT_TYPE,
+          SUM_FLOAT_TYPE,
+          SUM_DOUBLE_TYPE,
+          MIN_BIGINT_TYPE,
+          MIN_FLOAT_TYPE,
+          MIN_DOUBLE_TYPE,
+          MAX_BIGINT_TYPE,
+          MAX_FLOAT_TYPE,
+          MAX_DOUBLE_TYPE
+      );
+  public static final Map<String, List<MeasureType>> TYPES;
+
+  static {
+    Map<String, List<MeasureType>> map = new HashMap<>();
+    for (MeasureType fn : TYPE_LIST) {
+      List<MeasureType> overloads = map.computeIfAbsent(fn.name, x -> new ArrayList<>());
+      overloads.add(fn);
+    }
+    TYPES = ImmutableMap.<String, List<MeasureType>>builder().putAll(map).build();
+  }
+
+  public static MeasureType parse(String typeStr)
+  {
+    Pattern p = Pattern.compile("(\\w+)(?:\\s*\\((.*)\\))?");

Review Comment:
   not sure if can use `ExprParser` for parsing the function signature - need to check some more on it



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {
+      if (columns.get(i).name().equals(colName)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static List<String> columnNames(List<ColumnSpec> columns)
+  {
+    return columns
+           .stream()
+           .map(col -> col.name())
+           .collect(Collectors.toList());
+  }
+
+  public static <T extends ColumnSpec> List<T> dropColumns(
+      final List<T> columns,
+      final List<String> toDrop)
+  {
+    if (toDrop == null || toDrop.isEmpty()) {
+      return columns;
+    }
+    Set<String> drop = new HashSet<String>(toDrop);
+    List<T> revised = new ArrayList<>();
+    for (T col : columns) {
+      if (!drop.contains(col.name())) {
+        revised.add(col);
+      }
+    }
+    return revised;
+  }
+
+  /**
+   * Convert a catalog granularity string to the Druid form. Catalog granularities
+   * are either the usual descriptive strings (in any case), or an ISO period.
+   * For the odd interval, the interval name is also accepted (for the other
+   * intervals, the interval name is the descriptive string).
+   */
+  public static Granularity asDruidGranularity(String value)
+  {
+    if (Strings.isNullOrEmpty(value)) {
+      return Granularities.ALL;
+    }
+    Granularity gran = toGranularity(value);
+    if (gran != null) {
+      return gran;
+    }
+
+    try {
+      return new PeriodGranularity(new Period(value), null, null);
+    }
+    catch (IllegalArgumentException e) {
+      throw new IAE(StringUtils.format("%s is an invalid period string", value));
+    }
+  }
+
+  /**
+   * {@code String}-to-{@code List<String>} conversion. The string can contain zero items,
+   * one items, or a list. The list items are separated by a comma and optional
+   * whitespace.
+   */
+  public static List<String> stringToList(String value)
+  {
+    if (value == null) {
+      return null;
+    }
+    return Arrays.asList(value.split(",\\s*"));
+  }
+
+  public static <T> T safeCast(Object value, Class<T> type, String propertyName)
+  {
+    if (value == null) {
+      return null;
+    }
+    try {
+      return type.cast(value);
+    }
+    catch (ClassCastException e) {
+      throw new IAE("Value [%s] is not valid for property %s, expected type %s",
+          value,
+          propertyName,
+          type.getSimpleName()
+      );
+    }
+  }
+
+  public static <T> T safeGet(Map<String, Object> map, String propertyName, Class<T> type)
+  {
+    return safeCast(map.get(propertyName), type, propertyName);
+  }
+
+  public static String stringListToLines(List<String> lines)
+  {
+    if (lines.isEmpty()) {
+      return "";
+    }
+    return String.join("\n", lines) + "\n";
+  }
+
+  public static Set<String> setOf(String...items)
+  {
+    if (items.length == 0) {
+      return null;
+    }
+    return new HashSet<>(Arrays.asList(items));
+  }
+
+  public static byte[] toBytes(ObjectMapper jsonMapper, Object obj)

Review Comment:
   the json related methods could be a part of `JacksonUtils` - and they could be generic by accepting the error message to show as a method parameter. 



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {
+      if (columns.get(i).name().equals(colName)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static List<String> columnNames(List<ColumnSpec> columns)
+  {
+    return columns
+           .stream()
+           .map(col -> col.name())
+           .collect(Collectors.toList());
+  }
+
+  public static <T extends ColumnSpec> List<T> dropColumns(
+      final List<T> columns,
+      final List<String> toDrop)
+  {
+    if (toDrop == null || toDrop.isEmpty()) {
+      return columns;
+    }
+    Set<String> drop = new HashSet<String>(toDrop);
+    List<T> revised = new ArrayList<>();
+    for (T col : columns) {

Review Comment:
   nit: could be lambda



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {
+      if (columns.get(i).name().equals(colName)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static List<String> columnNames(List<ColumnSpec> columns)
+  {
+    return columns
+           .stream()
+           .map(col -> col.name())
+           .collect(Collectors.toList());
+  }
+
+  public static <T extends ColumnSpec> List<T> dropColumns(
+      final List<T> columns,
+      final List<String> toDrop)
+  {
+    if (toDrop == null || toDrop.isEmpty()) {
+      return columns;
+    }
+    Set<String> drop = new HashSet<String>(toDrop);
+    List<T> revised = new ArrayList<>();
+    for (T col : columns) {
+      if (!drop.contains(col.name())) {
+        revised.add(col);
+      }
+    }
+    return revised;
+  }
+
+  /**
+   * Convert a catalog granularity string to the Druid form. Catalog granularities
+   * are either the usual descriptive strings (in any case), or an ISO period.
+   * For the odd interval, the interval name is also accepted (for the other
+   * intervals, the interval name is the descriptive string).
+   */
+  public static Granularity asDruidGranularity(String value)
+  {
+    if (Strings.isNullOrEmpty(value)) {
+      return Granularities.ALL;
+    }
+    Granularity gran = toGranularity(value);
+    if (gran != null) {
+      return gran;
+    }
+
+    try {
+      return new PeriodGranularity(new Period(value), null, null);
+    }
+    catch (IllegalArgumentException e) {
+      throw new IAE(StringUtils.format("%s is an invalid period string", value));
+    }
+  }
+
+  /**
+   * {@code String}-to-{@code List<String>} conversion. The string can contain zero items,
+   * one items, or a list. The list items are separated by a comma and optional
+   * whitespace.
+   */
+  public static List<String> stringToList(String value)
+  {
+    if (value == null) {
+      return null;
+    }
+    return Arrays.asList(value.split(",\\s*"));
+  }
+
+  public static <T> T safeCast(Object value, Class<T> type, String propertyName)
+  {
+    if (value == null) {
+      return null;
+    }
+    try {
+      return type.cast(value);
+    }
+    catch (ClassCastException e) {
+      throw new IAE("Value [%s] is not valid for property %s, expected type %s",
+          value,
+          propertyName,
+          type.getSimpleName()
+      );
+    }
+  }
+
+  public static <T> T safeGet(Map<String, Object> map, String propertyName, Class<T> type)
+  {
+    return safeCast(map.get(propertyName), type, propertyName);
+  }
+
+  public static String stringListToLines(List<String> lines)
+  {
+    if (lines.isEmpty()) {
+      return "";
+    }
+    return String.join("\n", lines) + "\n";
+  }
+
+  public static Set<String> setOf(String...items)
+  {
+    if (items.length == 0) {
+      return null;
+    }
+    return new HashSet<>(Arrays.asList(items));
+  }
+
+  public static byte[] toBytes(ObjectMapper jsonMapper, Object obj)
+  {
+    try {
+      return jsonMapper.writeValueAsBytes(obj);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Failed to serialize " + obj.getClass().getSimpleName());
+    }
+  }
+
+  public static <T> T fromBytes(ObjectMapper jsonMapper, byte[] bytes, Class<T> clazz)
+  {
+    try {
+      return jsonMapper.readValue(bytes, clazz);
+    }
+    catch (IOException e) {
+      throw new ISE(e, "Failed to deserialize a " + clazz.getSimpleName());
+    }
+  }
+
+  public static String toString(Object obj)
+  {
+    ObjectMapper jsonMapper = new ObjectMapper();

Review Comment:
   should re-use the object mapper or get it as the argument



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);

Review Comment:
   would it make sense to start by just accepting granularity enum names (underscore and replacing underscore with sapce)? That could be automated and also contains most of the common cases



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {
+      if (columns.get(i).name().equals(colName)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static List<String> columnNames(List<ColumnSpec> columns)
+  {
+    return columns
+           .stream()
+           .map(col -> col.name())
+           .collect(Collectors.toList());
+  }
+
+  public static <T extends ColumnSpec> List<T> dropColumns(

Review Comment:
   something like `difference` would be a better name



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {
+      if (columns.get(i).name().equals(colName)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static List<String> columnNames(List<ColumnSpec> columns)
+  {
+    return columns
+           .stream()
+           .map(col -> col.name())
+           .collect(Collectors.toList());
+  }
+
+  public static <T extends ColumnSpec> List<T> dropColumns(
+      final List<T> columns,
+      final List<String> toDrop)
+  {
+    if (toDrop == null || toDrop.isEmpty()) {
+      return columns;
+    }
+    Set<String> drop = new HashSet<String>(toDrop);
+    List<T> revised = new ArrayList<>();
+    for (T col : columns) {
+      if (!drop.contains(col.name())) {
+        revised.add(col);
+      }
+    }
+    return revised;
+  }
+
+  /**
+   * Convert a catalog granularity string to the Druid form. Catalog granularities
+   * are either the usual descriptive strings (in any case), or an ISO period.
+   * For the odd interval, the interval name is also accepted (for the other
+   * intervals, the interval name is the descriptive string).
+   */
+  public static Granularity asDruidGranularity(String value)
+  {
+    if (Strings.isNullOrEmpty(value)) {
+      return Granularities.ALL;
+    }
+    Granularity gran = toGranularity(value);
+    if (gran != null) {
+      return gran;
+    }
+
+    try {
+      return new PeriodGranularity(new Period(value), null, null);
+    }
+    catch (IllegalArgumentException e) {
+      throw new IAE(StringUtils.format("%s is an invalid period string", value));
+    }
+  }
+
+  /**
+   * {@code String}-to-{@code List<String>} conversion. The string can contain zero items,
+   * one items, or a list. The list items are separated by a comma and optional
+   * whitespace.
+   */
+  public static List<String> stringToList(String value)
+  {
+    if (value == null) {
+      return null;
+    }
+    return Arrays.asList(value.split(",\\s*"));
+  }
+
+  public static <T> T safeCast(Object value, Class<T> type, String propertyName)
+  {
+    if (value == null) {
+      return null;
+    }
+    try {
+      return type.cast(value);
+    }
+    catch (ClassCastException e) {
+      throw new IAE("Value [%s] is not valid for property %s, expected type %s",
+          value,
+          propertyName,
+          type.getSimpleName()
+      );
+    }
+  }
+
+  public static <T> T safeGet(Map<String, Object> map, String propertyName, Class<T> type)
+  {
+    return safeCast(map.get(propertyName), type, propertyName);
+  }
+
+  public static String stringListToLines(List<String> lines)
+  {
+    if (lines.isEmpty()) {
+      return "";
+    }
+    return String.join("\n", lines) + "\n";
+  }
+
+  public static Set<String> setOf(String...items)

Review Comment:
   this could be replaced by `Sets.newHashSet` or `ImmutableSet.of` in the callers themselves



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {
+      if (columns.get(i).name().equals(colName)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static List<String> columnNames(List<ColumnSpec> columns)
+  {
+    return columns
+           .stream()
+           .map(col -> col.name())
+           .collect(Collectors.toList());
+  }
+
+  public static <T extends ColumnSpec> List<T> dropColumns(
+      final List<T> columns,
+      final List<String> toDrop)
+  {
+    if (toDrop == null || toDrop.isEmpty()) {
+      return columns;
+    }
+    Set<String> drop = new HashSet<String>(toDrop);
+    List<T> revised = new ArrayList<>();
+    for (T col : columns) {
+      if (!drop.contains(col.name())) {
+        revised.add(col);
+      }
+    }
+    return revised;
+  }
+
+  /**
+   * Convert a catalog granularity string to the Druid form. Catalog granularities
+   * are either the usual descriptive strings (in any case), or an ISO period.
+   * For the odd interval, the interval name is also accepted (for the other
+   * intervals, the interval name is the descriptive string).
+   */
+  public static Granularity asDruidGranularity(String value)
+  {
+    if (Strings.isNullOrEmpty(value)) {
+      return Granularities.ALL;
+    }
+    Granularity gran = toGranularity(value);
+    if (gran != null) {
+      return gran;
+    }
+
+    try {
+      return new PeriodGranularity(new Period(value), null, null);
+    }
+    catch (IllegalArgumentException e) {
+      throw new IAE(StringUtils.format("%s is an invalid period string", value));
+    }
+  }
+
+  /**
+   * {@code String}-to-{@code List<String>} conversion. The string can contain zero items,
+   * one items, or a list. The list items are separated by a comma and optional
+   * whitespace.
+   */
+  public static List<String> stringToList(String value)
+  {
+    if (value == null) {
+      return null;
+    }
+    return Arrays.asList(value.split(",\\s*"));
+  }
+
+  public static <T> T safeCast(Object value, Class<T> type, String propertyName)
+  {
+    if (value == null) {
+      return null;
+    }
+    try {
+      return type.cast(value);
+    }
+    catch (ClassCastException e) {
+      throw new IAE("Value [%s] is not valid for property %s, expected type %s",
+          value,
+          propertyName,
+          type.getSimpleName()
+      );
+    }
+  }
+
+  public static <T> T safeGet(Map<String, Object> map, String propertyName, Class<T> type)
+  {
+    return safeCast(map.get(propertyName), type, propertyName);
+  }
+
+  public static String stringListToLines(List<String> lines)
+  {
+    if (lines.isEmpty()) {
+      return "";
+    }
+    return String.join("\n", lines) + "\n";
+  }
+
+  public static Set<String> setOf(String...items)
+  {
+    if (items.length == 0) {
+      return null;
+    }
+    return new HashSet<>(Arrays.asList(items));
+  }
+
+  public static byte[] toBytes(ObjectMapper jsonMapper, Object obj)
+  {
+    try {
+      return jsonMapper.writeValueAsBytes(obj);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Failed to serialize " + obj.getClass().getSimpleName());
+    }
+  }
+
+  public static <T> T fromBytes(ObjectMapper jsonMapper, byte[] bytes, Class<T> clazz)
+  {
+    try {
+      return jsonMapper.readValue(bytes, clazz);
+    }
+    catch (IOException e) {
+      throw new ISE(e, "Failed to deserialize a " + clazz.getSimpleName());
+    }
+  }
+
+  public static String toString(Object obj)
+  {
+    ObjectMapper jsonMapper = new ObjectMapper();
+    try {
+      return jsonMapper.writeValueAsString(obj);
+    }
+    catch (JsonProcessingException e) {
+      throw new ISE("Failed to serialize TableDefn");
+    }
+  }
+
+  public static <T> List<T> concatLists(

Review Comment:
   nit: could be written as  `return Stream.of(base, additions).filter(Objects::nonNull).flatMap(Collection::stream).collect(Collectors.toList());` and the arguments of method should be nullable since that's the main thing which led to this method creation I think



##########
server/src/main/java/org/apache/druid/catalog/model/Properties.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Definition of a top-level property in a catalog object.
+ * Provides a set of typical property definitions. Others can be
+ * created case-by-case.
+ * <p>
+ * Property definitions define the property name, validate the value,
+ * and merge updates. Properties have a type: but the type is implicit
+ * via the validation, as is needed when the type is actually a map
+ * which represents a Java object, or when the value is a list.
+ */
+public interface Properties
+{
+  interface PropertyDefn
+  {
+    String name();
+    String typeName();
+    void validate(Object value, ObjectMapper jsonMapper);
+    Object merge(Object existing, Object update);
+  }
+
+  abstract class BasePropertyDefn implements PropertyDefn
+  {
+    protected final String name;
+
+    public BasePropertyDefn(final String name)
+    {
+      this.name = name;
+    }
+
+    @Override
+    public String name()
+    {
+      return name;
+    }
+
+    @Override
+    public Object merge(Object existing, Object update)
+    {
+      return update == null ? existing : update;
+    }
+
+    @Override
+    public String toString()
+    {
+      return getClass().getSimpleName() + "{"
+          + "name: " + name
+          + ", type: " + typeName()
+          + "}";
+    }
+  }
+
+  class SimplePropertyDefn<T> extends BasePropertyDefn
+  {
+    public final Class<T> valueClass;
+
+    public SimplePropertyDefn(
+        final String name,
+        final Class<T> valueClass
+    )
+    {
+      super(name);
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    public String typeName()
+    {
+      return valueClass.getSimpleName();
+    }
+
+    /**
+     * Convert the value from the deserialized JSON format to the type
+     * required by this field data type. Also used to decode values from
+     * SQL parameters. As a side effect, verifies that the value is of
+     * the correct type.
+     */
+    public T decode(Object value, ObjectMapper jsonMapper)

Review Comment:
   probably `decode` could also be a part of the properties interface



##########
server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.java.util.common.IAE;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specification of table columns. Columns have multiple types
+ * represented via the type field.
+ */
+@UnstableApi
+public class ColumnSpec
+{
+  private final String type;
+  private final String name;
+  private final String sqlType;

Review Comment:
   I'm surely missing the context here - but wanted to know the reason for keeping typeValue as a separate String and not a native construct like ColumnType? The calcite types are extended so that the users can directly see the native types for the columns. Should catalog also support something like that? 



##########
server/src/main/java/org/apache/druid/catalog/model/ColumnDefn.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.catalog.model.Properties.PropertyDefn;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+public class ColumnDefn extends ObjectDefn
+{
+  /**
+   * Convenience class that holds a column specification and its corresponding
+   * definition. This allows the spec to be a pure "data object" without knowledge
+   * of the metadata representation given by the column definition.
+   */
+  public static class ResolvedColumn
+  {
+    private final ColumnDefn defn;
+    private final ColumnSpec spec;
+
+    public ResolvedColumn(ColumnDefn defn, ColumnSpec spec)
+    {
+      this.defn = defn;
+      this.spec = spec;
+    }
+
+    public ColumnDefn defn()
+    {
+      return defn;
+    }
+
+    public ColumnSpec spec()
+    {
+      return spec;
+    }
+
+    public ResolvedColumn merge(ColumnSpec update)
+    {
+      return new ResolvedColumn(defn, defn.merge(spec, update));
+    }
+
+    public void validate(ObjectMapper jsonMapper)
+    {
+      defn.validate(spec, jsonMapper);
+    }
+  }
+
+  public ColumnDefn(
+      final String name,
+      final String typeValue,
+      final List<PropertyDefn> fields
+  )
+  {
+    super(name, typeValue, fields);
+  }
+
+  public ColumnSpec merge(ColumnSpec spec, ColumnSpec update)
+  {
+    String updateType = update.type();
+    if (updateType != null && !spec.type().equals(updateType)) {
+      throw new IAE("The update type must be null or [%s]", spec.type());
+    }
+    String revisedType = update.sqlType() == null ? spec.sqlType() : update.sqlType();
+    Map<String, Object> revisedProps = mergeProperties(
+        spec.properties(),
+        update.properties()
+    );
+    return new ColumnSpec(spec.type(), spec.name(), revisedType, revisedProps);

Review Comment:
   should we rather use `update.name()` to also allow for column level rename maybe?



##########
server/src/main/java/org/apache/druid/catalog/model/TableDefn.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.catalog.model.Properties.PropertyDefn;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Definition for all tables in the catalog. All tables have both
+ * properties and a schema. Subclasses define specific table types
+ * such as datasources or input tables. Some tables may be parameterized
+ * to allow the table to appear in a SQL table function by implementing
+ * the {@link Parameterized} interface.
+ */
+public class TableDefn extends ObjectDefn
+{
+  /**
+   * Human-readable description of the datasource.
+   */
+  public static final String DESCRIPTION_PROPERTY = "description";
+
+  private final Map<String, ColumnDefn> columnDefns;
+
+  public TableDefn(
+      final String name,
+      final String typeValue,
+      final List<PropertyDefn> properties,
+      final List<ColumnDefn> columnDefns
+  )
+  {
+    super(
+        name,
+        typeValue,
+        CatalogUtils.concatLists(
+            Collections.singletonList(
+                new Properties.StringPropertyDefn(DESCRIPTION_PROPERTY)
+            ),
+            properties
+        )
+    );
+    this.columnDefns = columnDefns == null ? Collections.emptyMap() : toColumnMap(columnDefns);
+  }
+
+  public static Map<String, ColumnDefn> toColumnMap(final List<ColumnDefn> colTypes)
+  {
+    ImmutableMap.Builder<String, ColumnDefn> builder = ImmutableMap.builder();
+    for (ColumnDefn colType : colTypes) {
+      builder.put(colType.typeValue(), colType);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Validate a table spec using the table, field and column definitions defined
+   * here. The column definitions validate the type of each property value using
+   * the object mapper.
+   */
+  public void validate(ResolvedTable table)
+  {
+    validate(table.properties(), table.jsonMapper());
+    if (table.spec().columns() == null) {
+      return;

Review Comment:
   does this mean that we allow defining tables without any columns?



##########
server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ColumnDefn;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.Properties;
+import org.apache.druid.catalog.model.Properties.GranularityPropertyDefn;
+import org.apache.druid.catalog.model.Properties.PropertyDefn;
+import org.apache.druid.catalog.model.Properties.StringListPropertyDefn;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.TableDefn;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class DatasourceDefn extends TableDefn
+{
+  /**
+   * Segment grain at ingestion and initial compaction. Aging rules
+   * may override the value as segments age. If not provided here,
+   * then it must be provided at ingestion time.
+   */
+  public static final String SEGMENT_GRANULARITY_PROPERTY = "segmentGranularity";
+
+  /**
+   * The target segment size at ingestion and initial compaction.
+   * If unset, then the system setting is used.
+   */
+  public static final String TARGET_SEGMENT_ROWS_PROPERTY = "targetSegmentRows";
+  public static final String CLUSTER_KEYS_PROPERTY = "clusterKeys";
+  public static final String HIDDEN_COLUMNS_PROPERTY = "hiddenColumns";
+
+  /**
+   * Ingestion and auto-compaction rollup granularity. If null, then no
+   * rollup is enabled. Same as {@code queryGranularity} in and ingest spec,
+   * but renamed since this granularity affects rollup, not queries. Can be
+   * overridden at ingestion time. The grain may change as segments evolve:
+   * this is the grain only for ingest.
+   */
+  public static final String ROLLUP_GRANULARITY_PROPERTY = "rollupGranularity";
+
+  public static final String DETAIL_DATASOURCE_TYPE = "detail";
+  public static final String ROLLUP_DATASOURCE_TYPE = "rollup";
+
+  public static final String DETAIL_COLUMN_TYPE = "detail";
+  public static final String DIMENSION_TYPE = "dimension";
+  public static final String MEASURE_TYPE = "measure";
+  public static final String INPUT_COLUMN_TYPE = "input";
+
+  public static class SegmentGranularityFieldDefn extends GranularityPropertyDefn
+  {
+    public SegmentGranularityFieldDefn()
+    {
+      super(SEGMENT_GRANULARITY_PROPERTY);
+    }
+
+    @Override
+    public void validate(Object value, ObjectMapper jsonMapper)
+    {
+      String gran = decode(value, jsonMapper);
+      if (Strings.isNullOrEmpty(gran)) {
+        throw new IAE("Segment granularity is required.");
+      }
+      validateGranularity(gran);
+    }
+  }
+
+  public static class HiddenColumnsDefn extends StringListPropertyDefn
+  {
+    public HiddenColumnsDefn()
+    {
+      super(HIDDEN_COLUMNS_PROPERTY);
+    }
+
+    @Override
+    public void validate(Object value, ObjectMapper jsonMapper)
+    {
+      if (value == null) {
+        return;
+      }
+      List<String> hiddenColumns = decode(value, jsonMapper);
+      for (String col : hiddenColumns) {
+        if (Columns.TIME_COLUMN.equals(col)) {
+          throw new IAE(
+              StringUtils.format("Cannot hide column %s", col)
+          );
+        }
+      }
+    }
+  }
+
+  /**
+   * Definition of a column in a detail (non-rollup) datasource.
+   */
+  public static class DetailColumnDefn extends ColumnDefn
+  {
+    public DetailColumnDefn()
+    {
+      super(
+          "Column",
+          DETAIL_COLUMN_TYPE,
+          null
+      );
+    }
+
+    @Override
+    public void validate(ColumnSpec spec, ObjectMapper jsonMapper)
+    {
+      super.validate(spec, jsonMapper);
+      validateScalarColumn(spec);
+    }
+  }
+
+  /**
+   * Definition of a dimension in a rollup datasource.
+   */
+  public static class DimensionDefn extends ColumnDefn
+  {
+    public DimensionDefn()
+    {
+      super(
+          "Dimension",
+          DIMENSION_TYPE,
+          null
+      );
+    }
+
+    @Override
+    public void validate(ColumnSpec spec, ObjectMapper jsonMapper)
+    {
+      super.validate(spec, jsonMapper);
+      validateScalarColumn(spec);
+    }
+  }
+
+  /**
+   * Definition of a measure (metric) column.
+   * Types are expressed as compound types: "AGG_FN(ARG_TYPE,...)"
+   * where "AGG_FN" is one of the supported aggregate functions,
+   * and "ARG_TYPE" is zero or more argument types.
+   */
+  public static class MeasureDefn extends ColumnDefn
+  {
+    public MeasureDefn()
+    {
+      super(
+          "Measure",
+          MEASURE_TYPE,
+          null
+      );
+    }
+
+    @Override
+    public void validate(ColumnSpec spec, ObjectMapper jsonMapper)
+    {
+      super.validate(spec, jsonMapper);
+      if (spec.sqlType() == null) {
+        throw new IAE("A type is required for measure column " + spec.name());
+      }
+      if (Columns.isTimeColumn(spec.name())) {
+        throw new IAE(StringUtils.format(
+            "%s column cannot be a measure",
+            Columns.TIME_COLUMN
+            ));
+      }
+      MeasureTypes.parse(spec.sqlType());
+    }
+  }
+
+  public static class DetailDatasourceDefn extends DatasourceDefn
+  {
+    public DetailDatasourceDefn()
+    {
+      super(
+          "Detail datasource",
+          DETAIL_DATASOURCE_TYPE,
+          null,
+          Collections.singletonList(new DetailColumnDefn())
+      );
+    }
+  }
+
+  public static class RollupDatasourceDefn extends DatasourceDefn
+  {
+    public RollupDatasourceDefn()
+    {
+      super(
+          "Rollup datasource",
+          ROLLUP_DATASOURCE_TYPE,
+          Collections.singletonList(
+              new Properties.GranularityPropertyDefn(ROLLUP_GRANULARITY_PROPERTY)
+          ),
+          Arrays.asList(
+              new DimensionDefn(),
+              new MeasureDefn()
+          )
+      );
+    }
+  }
+
+  public DatasourceDefn(
+      final String name,
+      final String typeValue,
+      final List<PropertyDefn> properties,
+      final List<ColumnDefn> columnDefns
+  )
+  {
+    super(
+        name,
+        typeValue,
+        CatalogUtils.concatLists(
+            Arrays.asList(
+                new SegmentGranularityFieldDefn(),
+                new Properties.IntPropertyDefn(TARGET_SEGMENT_ROWS_PROPERTY),
+                new Properties.ListPropertyDefn<ClusterKeySpec>(
+                    CLUSTER_KEYS_PROPERTY,
+                    "cluster keys",
+                    new TypeReference<List<ClusterKeySpec>>() { }
+                ),
+                new HiddenColumnsDefn()
+            ),
+            properties
+        ),
+        columnDefns
+    );
+  }
+
+  public static boolean isDatasource(String tableType)

Review Comment:
   nit : unused methods below



##########
server/src/main/java/org/apache/druid/catalog/model/table/ClusterKeySpec.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+public class ClusterKeySpec
+{
+  private final String expr;
+  private final boolean desc;
+
+  @JsonCreator
+  public ClusterKeySpec(
+      @JsonProperty("column") String expr,
+      @JsonProperty("desc") @Nullable Boolean desc
+  )
+  {
+    this.expr = expr;
+    this.desc = desc != null && desc == true;
+  }
+
+  public ClusterKeySpec(String expr)

Review Comment:
   nit: unused const



##########
server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.model.CatalogUtils;
+import org.apache.druid.catalog.model.ColumnDefn;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.Properties;
+import org.apache.druid.catalog.model.Properties.GranularityPropertyDefn;
+import org.apache.druid.catalog.model.Properties.PropertyDefn;
+import org.apache.druid.catalog.model.Properties.StringListPropertyDefn;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.catalog.model.TableDefn;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class DatasourceDefn extends TableDefn
+{
+  /**
+   * Segment grain at ingestion and initial compaction. Aging rules
+   * may override the value as segments age. If not provided here,
+   * then it must be provided at ingestion time.
+   */
+  public static final String SEGMENT_GRANULARITY_PROPERTY = "segmentGranularity";
+
+  /**
+   * The target segment size at ingestion and initial compaction.
+   * If unset, then the system setting is used.
+   */
+  public static final String TARGET_SEGMENT_ROWS_PROPERTY = "targetSegmentRows";
+  public static final String CLUSTER_KEYS_PROPERTY = "clusterKeys";
+  public static final String HIDDEN_COLUMNS_PROPERTY = "hiddenColumns";
+
+  /**
+   * Ingestion and auto-compaction rollup granularity. If null, then no
+   * rollup is enabled. Same as {@code queryGranularity} in and ingest spec,
+   * but renamed since this granularity affects rollup, not queries. Can be
+   * overridden at ingestion time. The grain may change as segments evolve:
+   * this is the grain only for ingest.
+   */
+  public static final String ROLLUP_GRANULARITY_PROPERTY = "rollupGranularity";
+
+  public static final String DETAIL_DATASOURCE_TYPE = "detail";
+  public static final String ROLLUP_DATASOURCE_TYPE = "rollup";
+
+  public static final String DETAIL_COLUMN_TYPE = "detail";
+  public static final String DIMENSION_TYPE = "dimension";
+  public static final String MEASURE_TYPE = "measure";
+  public static final String INPUT_COLUMN_TYPE = "input";
+
+  public static class SegmentGranularityFieldDefn extends GranularityPropertyDefn
+  {
+    public SegmentGranularityFieldDefn()
+    {
+      super(SEGMENT_GRANULARITY_PROPERTY);
+    }
+
+    @Override
+    public void validate(Object value, ObjectMapper jsonMapper)
+    {
+      String gran = decode(value, jsonMapper);
+      if (Strings.isNullOrEmpty(gran)) {
+        throw new IAE("Segment granularity is required.");
+      }
+      validateGranularity(gran);
+    }
+  }
+
+  public static class HiddenColumnsDefn extends StringListPropertyDefn
+  {
+    public HiddenColumnsDefn()
+    {
+      super(HIDDEN_COLUMNS_PROPERTY);
+    }
+
+    @Override
+    public void validate(Object value, ObjectMapper jsonMapper)
+    {
+      if (value == null) {
+        return;
+      }
+      List<String> hiddenColumns = decode(value, jsonMapper);
+      for (String col : hiddenColumns) {
+        if (Columns.TIME_COLUMN.equals(col)) {
+          throw new IAE(
+              StringUtils.format("Cannot hide column %s", col)
+          );
+        }
+      }
+    }
+  }
+
+  /**
+   * Definition of a column in a detail (non-rollup) datasource.
+   */
+  public static class DetailColumnDefn extends ColumnDefn
+  {
+    public DetailColumnDefn()
+    {
+      super(
+          "Column",
+          DETAIL_COLUMN_TYPE,
+          null
+      );
+    }
+
+    @Override
+    public void validate(ColumnSpec spec, ObjectMapper jsonMapper)
+    {
+      super.validate(spec, jsonMapper);
+      validateScalarColumn(spec);
+    }
+  }
+
+  /**
+   * Definition of a dimension in a rollup datasource.
+   */
+  public static class DimensionDefn extends ColumnDefn
+  {
+    public DimensionDefn()
+    {
+      super(
+          "Dimension",
+          DIMENSION_TYPE,
+          null
+      );
+    }
+
+    @Override
+    public void validate(ColumnSpec spec, ObjectMapper jsonMapper)
+    {
+      super.validate(spec, jsonMapper);
+      validateScalarColumn(spec);
+    }
+  }
+
+  /**
+   * Definition of a measure (metric) column.
+   * Types are expressed as compound types: "AGG_FN(ARG_TYPE,...)"
+   * where "AGG_FN" is one of the supported aggregate functions,
+   * and "ARG_TYPE" is zero or more argument types.
+   */
+  public static class MeasureDefn extends ColumnDefn
+  {
+    public MeasureDefn()
+    {
+      super(
+          "Measure",
+          MEASURE_TYPE,
+          null
+      );
+    }
+
+    @Override
+    public void validate(ColumnSpec spec, ObjectMapper jsonMapper)
+    {
+      super.validate(spec, jsonMapper);
+      if (spec.sqlType() == null) {
+        throw new IAE("A type is required for measure column " + spec.name());
+      }
+      if (Columns.isTimeColumn(spec.name())) {
+        throw new IAE(StringUtils.format(
+            "%s column cannot be a measure",
+            Columns.TIME_COLUMN
+            ));
+      }
+      MeasureTypes.parse(spec.sqlType());
+    }
+  }
+
+  public static class DetailDatasourceDefn extends DatasourceDefn
+  {
+    public DetailDatasourceDefn()
+    {
+      super(
+          "Detail datasource",
+          DETAIL_DATASOURCE_TYPE,
+          null,
+          Collections.singletonList(new DetailColumnDefn())
+      );
+    }
+  }
+
+  public static class RollupDatasourceDefn extends DatasourceDefn
+  {
+    public RollupDatasourceDefn()
+    {
+      super(
+          "Rollup datasource",
+          ROLLUP_DATASOURCE_TYPE,
+          Collections.singletonList(
+              new Properties.GranularityPropertyDefn(ROLLUP_GRANULARITY_PROPERTY)
+          ),
+          Arrays.asList(
+              new DimensionDefn(),
+              new MeasureDefn()
+          )
+      );
+    }
+  }
+
+  public DatasourceDefn(
+      final String name,
+      final String typeValue,
+      final List<PropertyDefn> properties,
+      final List<ColumnDefn> columnDefns
+  )
+  {
+    super(
+        name,
+        typeValue,
+        CatalogUtils.concatLists(
+            Arrays.asList(
+                new SegmentGranularityFieldDefn(),

Review Comment:
   doubt : does the segment gran and cluster key spec in the DS defn represent the latest configuration for an ingestion like columns? the previous data could have different partitioning and clustering.
   
   also, maybe this could be `SegmentGranularityPropertyDefn`



##########
server/src/main/java/org/apache/druid/catalog/model/Parameterized.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import org.apache.druid.catalog.model.table.ExternalSpec;
+
+import java.util.List;
+import java.util.Map;
+
+public interface Parameterized
+{
+  interface ParameterDefn
+  {
+    String name();
+    Class<?> valueClass();
+  }
+
+  class ParameterImpl implements ParameterDefn
+  {
+    private final String name;
+    private final Class<?> type;
+
+    public ParameterImpl(final String name, final Class<?> type)
+    {
+      this.name = name;
+      this.type = type;
+    }
+
+    @Override
+    public String name()
+    {
+      return name;
+    }
+
+    @Override
+    public Class<?> valueClass()
+    {
+      return type;
+    }
+  }
+
+  List<ParameterDefn> parameters();
+  ParameterDefn parameter(String name);
+  ExternalSpec applyParameters(ResolvedTable table, Map<String, Object> parameters);

Review Comment:
   should this return a new ResolvedTable?



##########
server/src/main/java/org/apache/druid/catalog/model/facade/InputTableFacade.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.facade;
+
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.List;
+
+public class InputTableFacade extends TableFacade

Review Comment:
   what would this represent?



##########
server/src/main/java/org/apache/druid/catalog/model/table/ExternalSpec.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.segment.column.RowSignature;
+
+/**
+ * Catalog form of an external table specification used to
+ * pass along the three components needed for an external table
+ * in MSQ ingest.
+ */
+public class ExternalSpec

Review Comment:
   `ExternalTableSpec`



##########
server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.joda.time.Period;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CatalogUtils
+{
+  // Amazing that a parser doesn't already exist...
+  private static final Map<String, Granularity> GRANULARITIES = new HashMap<>();
+
+  static {
+    GRANULARITIES.put("millisecond", Granularities.SECOND);
+    GRANULARITIES.put("second", Granularities.SECOND);
+    GRANULARITIES.put("minute", Granularities.MINUTE);
+    GRANULARITIES.put("5 minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("5 minutes", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("five_minute", Granularities.FIVE_MINUTE);
+    GRANULARITIES.put("10 minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("10 minutes", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("ten_minute", Granularities.TEN_MINUTE);
+    GRANULARITIES.put("15 minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("15 minutes", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("fifteen_minute", Granularities.FIFTEEN_MINUTE);
+    GRANULARITIES.put("30 minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("30 minutes", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("thirty_minute", Granularities.THIRTY_MINUTE);
+    GRANULARITIES.put("hour", Granularities.HOUR);
+    GRANULARITIES.put("6 hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("6 hours", Granularities.SIX_HOUR);
+    GRANULARITIES.put("six_hour", Granularities.SIX_HOUR);
+    GRANULARITIES.put("day", Granularities.DAY);
+    GRANULARITIES.put("week", Granularities.WEEK);
+    GRANULARITIES.put("month", Granularities.MONTH);
+    GRANULARITIES.put("quarter", Granularities.QUARTER);
+    GRANULARITIES.put("year", Granularities.YEAR);
+    GRANULARITIES.put("all", Granularities.ALL);
+  }
+
+  public static Granularity toGranularity(String value)
+  {
+    return GRANULARITIES.get(StringUtils.toLowerCase(value));
+  }
+
+  public static int findColumn(List<ColumnSpec> columns, String colName)
+  {
+    for (int i = 0; i < columns.size(); i++) {
+      if (columns.get(i).name().equals(colName)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static List<String> columnNames(List<ColumnSpec> columns)
+  {
+    return columns
+           .stream()
+           .map(col -> col.name())
+           .collect(Collectors.toList());
+  }
+
+  public static <T extends ColumnSpec> List<T> dropColumns(

Review Comment:
   also this method seems unused



##########
server/src/main/java/org/apache/druid/catalog/model/facade/InputTableFacade.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.facade;
+
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.ResolvedTable;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.List;
+
+public class InputTableFacade extends TableFacade

Review Comment:
   I think this represents the `ExternalTable` which is backed by an `InputSource`



##########
server/src/main/java/org/apache/druid/catalog/model/facade/ColumnFacade.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.facade;
+
+import org.apache.druid.catalog.model.ColumnDefn.ResolvedColumn;
+import org.apache.druid.catalog.model.Columns;
+import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.catalog.model.table.MeasureTypes;
+import org.apache.druid.catalog.model.table.MeasureTypes.MeasureType;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.column.ColumnType;
+
+public class ColumnFacade
+{
+  public static class DatasourceColumnFacade extends ColumnFacade

Review Comment:
   unused - will this used in future patches?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org