You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/26 14:54:20 UTC

[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #13165: Druid Catalog basics

abhishekagarwal87 commented on code in PR #13165:
URL: https://github.com/apache/druid/pull/13165#discussion_r1002398838


##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.table.AbstractDatasourceDefn;
+import org.apache.druid.catalog.storage.Actions;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.HideColumns;
+import org.apache.druid.catalog.storage.MoveColumn;
+import org.apache.druid.catalog.storage.MoveColumn.Position;
+import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(final CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  private enum PostAction
+  {
+    NEW,
+    IFNEW,
+    REPLACE,
+    FORCE;

Review Comment:
   I am wondering if we can use standard HTTP verbs here? 
   CREATE
     - NEW -> POST (idempotent - no)
     - IFNEW -> PUT (idempotent - yes)
   UPDATE
     - REPLACE -> POST (idempotent - no)
     - FORCE -> PUT (idempotent - yes)
     
     I think it will be easier to understand if CREATE and UPDATE are two separate endpoints. 



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.sync.CatalogUpdateNotifier;
+import org.apache.druid.catalog.sync.MetadataCatalog.CatalogListener;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+@Path(CatalogListenerResource.BASE_URL)
+public class CatalogListenerResource
+{
+  public static final String BASE_URL = "/druid/broker/v1/catalog";
+  public static final String SYNC_URL = "/sync";
+
+  private final CatalogListener listener;
+  private final AuthorizerMapper authorizerMapper;
+  private final ObjectMapper smileMapper;
+  private final ObjectMapper jsonMapper;
+
+  @Inject
+  public CatalogListenerResource(
+      final CatalogListener listener,
+      @Smile final ObjectMapper smileMapper,
+      @Json final ObjectMapper jsonMapper,
+      final AuthorizerMapper authorizerMapper)
+  {
+    this.listener = listener;
+    this.authorizerMapper = authorizerMapper;
+    this.smileMapper = smileMapper;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @POST
+  @Path(SYNC_URL)
+  @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+  public Response syncTable(
+      final InputStream inputStream,
+      @Context final HttpServletRequest req)
+  {
+    Response resp = checkAuth(req);
+    if (resp != null) {
+      return resp;
+    }
+    final String reqContentType = req.getContentType();
+    final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType);
+    final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper;
+    TableMetadata tableSpec;
+    try {
+      tableSpec = mapper.readValue(inputStream, TableMetadata.class);
+    }
+    catch (IOException e) {
+      return Response.serverError().entity(e.getMessage()).build();

Review Comment:
   this should be considered a bad request and not server error. 



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogListenerResource.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.sync.CatalogUpdateNotifier;
+import org.apache.druid.catalog.sync.MetadataCatalog.CatalogListener;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+@Path(CatalogListenerResource.BASE_URL)
+public class CatalogListenerResource
+{
+  public static final String BASE_URL = "/druid/broker/v1/catalog";
+  public static final String SYNC_URL = "/sync";
+
+  private final CatalogListener listener;
+  private final AuthorizerMapper authorizerMapper;
+  private final ObjectMapper smileMapper;
+  private final ObjectMapper jsonMapper;
+
+  @Inject
+  public CatalogListenerResource(
+      final CatalogListener listener,
+      @Smile final ObjectMapper smileMapper,
+      @Json final ObjectMapper jsonMapper,
+      final AuthorizerMapper authorizerMapper)
+  {
+    this.listener = listener;
+    this.authorizerMapper = authorizerMapper;
+    this.smileMapper = smileMapper;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @POST
+  @Path(SYNC_URL)
+  @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+  public Response syncTable(
+      final InputStream inputStream,
+      @Context final HttpServletRequest req)
+  {
+    Response resp = checkAuth(req);
+    if (resp != null) {
+      return resp;
+    }
+    final String reqContentType = req.getContentType();
+    final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType);
+    final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper;
+    TableMetadata tableSpec;
+    try {
+      tableSpec = mapper.readValue(inputStream, TableMetadata.class);
+    }
+    catch (IOException e) {
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+    TableSpec spec = tableSpec.spec();
+    if (CatalogUpdateNotifier.TOMBSTONE_TABLE_TYPE.equals(spec.type())) {
+      listener.deleted(tableSpec.id());
+    } else {
+      listener.updated(tableSpec);
+    }
+    return Response.status(Response.Status.ACCEPTED).build();
+  }
+
+  private Response checkAuth(final HttpServletRequest request)
+  {
+    final ResourceAction resourceAction = new ResourceAction(
+        new Resource("CONFIG", ResourceType.CONFIG),
+        Action.WRITE
+    );

Review Comment:
   Hmm. what does this access check mean here? 



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.table.AbstractDatasourceDefn;
+import org.apache.druid.catalog.storage.Actions;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.HideColumns;
+import org.apache.druid.catalog.storage.MoveColumn;
+import org.apache.druid.catalog.storage.MoveColumn.Position;
+import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(final CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  private enum PostAction
+  {
+    NEW,
+    IFNEW,
+    REPLACE,
+    FORCE;
+  }
+
+  /**
+   * Create a new table containing the given table specification.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param actionParam What to do if the table already exists.
+   *        {@code ifNew} is the same as the SQL IF NOT EXISTS clause. If {@code new},
+   *        then an error is raised if the table exists. If {@code ifNew}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts. The other two options are primarily for use in tests.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response postTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("action") String actionParam,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+    final PostAction action;
+    if (actionParam == null) {
+      action = PostAction.NEW;
+    } else {
+      action = PostAction.valueOf(StringUtils.toUpperCase(actionParam));
+      if (action == null) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Not a valid action: [%s]. Valid actions are new, ifNew, replace, force",
+                actionParam
+            )
+        );
+      }
+    }
+    TableId tableId = TableId.of(dbSchema, name);
+    Response response = authorizeTable(tableId, spec, req);
+    if (response != null) {
+      return response;
+    }
+    TableMetadata table = TableMetadata.newTable(tableId, spec);
+    try {
+      catalog.validate(table);
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+
+    switch (action) {
+      case NEW:
+        return insertTableSpec(table, false);
+      case IFNEW:
+        return insertTableSpec(table, true);
+      case REPLACE:
+        return updateTableSpec(table, version);
+      case FORCE:
+        return addOrUpdateTableSpec(table);
+      default:
+        throw new ISE("Unknown action.");
+    }
+  }
+
+  private Response authorizeTable(TableId tableId, TableSpec spec, final HttpServletRequest req)
+  {
+    // Druid has a fixed set of schemas. Ensure the one provided is valid.
+    Pair<Response, SchemaSpec> result = validateSchema(tableId.schema());
+    if (result.lhs != null) {
+      return result.lhs;
+    }

Review Comment:
   same comment here. 



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.table.AbstractDatasourceDefn;
+import org.apache.druid.catalog.storage.Actions;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.HideColumns;
+import org.apache.druid.catalog.storage.MoveColumn;
+import org.apache.druid.catalog.storage.MoveColumn.Position;
+import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(final CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  private enum PostAction
+  {
+    NEW,
+    IFNEW,
+    REPLACE,
+    FORCE;
+  }
+
+  /**
+   * Create a new table containing the given table specification.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param actionParam What to do if the table already exists.
+   *        {@code ifNew} is the same as the SQL IF NOT EXISTS clause. If {@code new},
+   *        then an error is raised if the table exists. If {@code ifNew}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts. The other two options are primarily for use in tests.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response postTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("action") String actionParam,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+    final PostAction action;
+    if (actionParam == null) {
+      action = PostAction.NEW;
+    } else {
+      action = PostAction.valueOf(StringUtils.toUpperCase(actionParam));
+      if (action == null) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Not a valid action: [%s]. Valid actions are new, ifNew, replace, force",
+                actionParam
+            )
+        );
+      }
+    }
+    TableId tableId = TableId.of(dbSchema, name);
+    Response response = authorizeTable(tableId, spec, req);

Review Comment:
   a bit of nitpick. this should have been a void method. it's weird to think that if a response is returned, then its an error condition, without knowing what kind of response it is. so it can be void method and an exception can be thrown from inside the method if user doesn't have access. 



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.table.AbstractDatasourceDefn;
+import org.apache.druid.catalog.storage.Actions;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.HideColumns;
+import org.apache.druid.catalog.storage.MoveColumn;
+import org.apache.druid.catalog.storage.MoveColumn.Position;
+import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(final CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  private enum PostAction
+  {
+    NEW,
+    IFNEW,
+    REPLACE,
+    FORCE;
+  }
+
+  /**
+   * Create a new table containing the given table specification.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param actionParam What to do if the table already exists.
+   *        {@code ifNew} is the same as the SQL IF NOT EXISTS clause. If {@code new},
+   *        then an error is raised if the table exists. If {@code ifNew}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts. The other two options are primarily for use in tests.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response postTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("action") String actionParam,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+    final PostAction action;
+    if (actionParam == null) {
+      action = PostAction.NEW;
+    } else {
+      action = PostAction.valueOf(StringUtils.toUpperCase(actionParam));
+      if (action == null) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Not a valid action: [%s]. Valid actions are new, ifNew, replace, force",
+                actionParam
+            )
+        );
+      }
+    }
+    TableId tableId = TableId.of(dbSchema, name);
+    Response response = authorizeTable(tableId, spec, req);
+    if (response != null) {
+      return response;
+    }
+    TableMetadata table = TableMetadata.newTable(tableId, spec);
+    try {
+      catalog.validate(table);
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+
+    switch (action) {
+      case NEW:
+        return insertTableSpec(table, false);
+      case IFNEW:
+        return insertTableSpec(table, true);
+      case REPLACE:
+        return updateTableSpec(table, version);
+      case FORCE:
+        return addOrUpdateTableSpec(table);
+      default:
+        throw new ISE("Unknown action.");
+    }
+  }
+
+  private Response authorizeTable(TableId tableId, TableSpec spec, final HttpServletRequest req)
+  {
+    // Druid has a fixed set of schemas. Ensure the one provided is valid.
+    Pair<Response, SchemaSpec> result = validateSchema(tableId.schema());
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    SchemaSpec schema = result.rhs;
+
+    // The schema has to be one that allows table definitions.
+    if (!schema.writable()) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format("Cannot modify schema %s", tableId.schema())
+      );
+    }
+
+    // Table name can't be blank or have spaces
+    if (Strings.isNullOrEmpty(tableId.name())) {
+      return Actions.badRequest(Actions.INVALID, "Table name is required");
+    }
+    if (!tableId.name().equals(tableId.name().trim())) {
+      return Actions.badRequest(Actions.INVALID, "Table name cannot start or end with spaces");
+    }
+
+    // The user has to have permission to modify the table.
+    try {
+      catalog.authorizer().authorizeTable(schema, tableId.name(), Action.WRITE, req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+
+    // Validate the spec, if provided.
+    if (spec != null) {
+
+      // The given table spec has to be valid for the given schema.
+      if (Strings.isNullOrEmpty(spec.type())) {
+        return Actions.badRequest(Actions.INVALID, "Table type is required");
+      }
+
+      if (!schema.accepts(spec.type())) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Cannot create tables of type %s in schema %s",
+                spec.getClass().getSimpleName(),
+                tableId.schema()
+            )
+        );
+      }
+    }
+
+    // Everything checks out, let the request proceed.
+    return null;
+  }
+
+  private Response insertTableSpec(TableMetadata table, boolean ifNew)
+  {
+    try {
+      long createVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(createVersion);
+    }
+    catch (DuplicateKeyException e) {
+      if (!ifNew) {
+        return Actions.badRequest(
+              Actions.DUPLICATE_ERROR,
+              StringUtils.format(
+                  "A table of name %s already exists",
+                  table.id().sqlName()
+              )
+        );
+      } else {
+        return Actions.okWithVersion(0);
+      }
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  private Response updateTableSpec(TableMetadata table, long version)
+  {
+    try {
+      long newVersion = catalog.tables().update(table, version);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (NotFoundException e) {
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+    catch (OutOfDateException e) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity(
+              Actions.error(
+                  Actions.DUPLICATE_ERROR,
+                  "The table entry not found or is older than the given version: reload and retry"))
+          .build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  private Response addOrUpdateTableSpec(TableMetadata table)
+  {
+    try {
+      long newVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (DuplicateKeyException e) {
+      // Fall through
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+    try {
+      long newVersion = catalog.tables().update(table, 0);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Update a table within the given schema.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param version An optional table version. If provided, the metadata DB
+   *        entry for the table must be at this exact version or the update
+   *        will fail. (Provides "optimistic locking.") If omitted (that is,
+   *        if zero), then no update conflict change is done.
+   * @param req the HTTP request used for authorization.
+   * @return the new version number of the table
+   */
+  @PUT
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateTableDefn(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+
+    TableDefnRegistry tableRegistry = catalog.tableRegistry();
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        spec,
+        req,
+        (existing) -> tableRegistry.resolve(existing).merge(spec).spec()
+    );
+  }
+
+  private Response incrementalUpdate(
+      TableId tableId,
+      TableSpec newSpec,
+      @Context final HttpServletRequest req,
+      Function<TableSpec, TableSpec> action
+  )
+  {
+    Response response = authorizeTable(tableId, newSpec, req);
+    if (response != null) {
+      return response;
+    }
+    try {
+      long newVersion = catalog.tables().updatePayload(tableId, action);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (NotFoundException e) {
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Move a single column to the start end of the column list, or before or after
+   * another column. Both columns must exist. Returns the version of the table
+   * after the update.

Review Comment:
   ```suggestion
      * Move a single column to the start or end of the column list, or before or after
      * another column. Both columns must exist. Returns the version of the table
      * after the update.
   ```



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.table.AbstractDatasourceDefn;
+import org.apache.druid.catalog.storage.Actions;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.HideColumns;
+import org.apache.druid.catalog.storage.MoveColumn;
+import org.apache.druid.catalog.storage.MoveColumn.Position;
+import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(final CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  private enum PostAction
+  {
+    NEW,
+    IFNEW,
+    REPLACE,
+    FORCE;
+  }
+
+  /**
+   * Create a new table containing the given table specification.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param actionParam What to do if the table already exists.
+   *        {@code ifNew} is the same as the SQL IF NOT EXISTS clause. If {@code new},
+   *        then an error is raised if the table exists. If {@code ifNew}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts. The other two options are primarily for use in tests.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response postTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("action") String actionParam,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+    final PostAction action;
+    if (actionParam == null) {
+      action = PostAction.NEW;
+    } else {
+      action = PostAction.valueOf(StringUtils.toUpperCase(actionParam));
+      if (action == null) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Not a valid action: [%s]. Valid actions are new, ifNew, replace, force",
+                actionParam
+            )
+        );
+      }
+    }
+    TableId tableId = TableId.of(dbSchema, name);
+    Response response = authorizeTable(tableId, spec, req);
+    if (response != null) {
+      return response;
+    }
+    TableMetadata table = TableMetadata.newTable(tableId, spec);
+    try {
+      catalog.validate(table);
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+
+    switch (action) {
+      case NEW:
+        return insertTableSpec(table, false);
+      case IFNEW:
+        return insertTableSpec(table, true);
+      case REPLACE:
+        return updateTableSpec(table, version);
+      case FORCE:
+        return addOrUpdateTableSpec(table);
+      default:
+        throw new ISE("Unknown action.");
+    }
+  }
+
+  private Response authorizeTable(TableId tableId, TableSpec spec, final HttpServletRequest req)
+  {
+    // Druid has a fixed set of schemas. Ensure the one provided is valid.
+    Pair<Response, SchemaSpec> result = validateSchema(tableId.schema());
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    SchemaSpec schema = result.rhs;
+
+    // The schema has to be one that allows table definitions.
+    if (!schema.writable()) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format("Cannot modify schema %s", tableId.schema())
+      );
+    }
+
+    // Table name can't be blank or have spaces
+    if (Strings.isNullOrEmpty(tableId.name())) {
+      return Actions.badRequest(Actions.INVALID, "Table name is required");
+    }
+    if (!tableId.name().equals(tableId.name().trim())) {
+      return Actions.badRequest(Actions.INVALID, "Table name cannot start or end with spaces");
+    }
+
+    // The user has to have permission to modify the table.
+    try {
+      catalog.authorizer().authorizeTable(schema, tableId.name(), Action.WRITE, req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+
+    // Validate the spec, if provided.
+    if (spec != null) {
+
+      // The given table spec has to be valid for the given schema.
+      if (Strings.isNullOrEmpty(spec.type())) {
+        return Actions.badRequest(Actions.INVALID, "Table type is required");
+      }
+
+      if (!schema.accepts(spec.type())) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Cannot create tables of type %s in schema %s",
+                spec.getClass().getSimpleName(),
+                tableId.schema()
+            )
+        );
+      }
+    }
+
+    // Everything checks out, let the request proceed.
+    return null;
+  }
+
+  private Response insertTableSpec(TableMetadata table, boolean ifNew)
+  {
+    try {
+      long createVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(createVersion);
+    }
+    catch (DuplicateKeyException e) {
+      if (!ifNew) {
+        return Actions.badRequest(
+              Actions.DUPLICATE_ERROR,
+              StringUtils.format(
+                  "A table of name %s already exists",
+                  table.id().sqlName()
+              )
+        );
+      } else {
+        return Actions.okWithVersion(0);
+      }
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  private Response updateTableSpec(TableMetadata table, long version)
+  {
+    try {
+      long newVersion = catalog.tables().update(table, version);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (NotFoundException e) {
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+    catch (OutOfDateException e) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity(
+              Actions.error(
+                  Actions.DUPLICATE_ERROR,
+                  "The table entry not found or is older than the given version: reload and retry"))
+          .build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  private Response addOrUpdateTableSpec(TableMetadata table)
+  {
+    try {
+      long newVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (DuplicateKeyException e) {
+      // Fall through
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+    try {
+      long newVersion = catalog.tables().update(table, 0);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Update a table within the given schema.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param version An optional table version. If provided, the metadata DB
+   *        entry for the table must be at this exact version or the update
+   *        will fail. (Provides "optimistic locking.") If omitted (that is,
+   *        if zero), then no update conflict change is done.
+   * @param req the HTTP request used for authorization.
+   * @return the new version number of the table
+   */
+  @PUT
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateTableDefn(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+
+    TableDefnRegistry tableRegistry = catalog.tableRegistry();
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        spec,
+        req,
+        (existing) -> tableRegistry.resolve(existing).merge(spec).spec()
+    );
+  }
+
+  private Response incrementalUpdate(
+      TableId tableId,
+      TableSpec newSpec,
+      @Context final HttpServletRequest req,
+      Function<TableSpec, TableSpec> action
+  )
+  {
+    Response response = authorizeTable(tableId, newSpec, req);
+    if (response != null) {
+      return response;
+    }
+    try {
+      long newVersion = catalog.tables().updatePayload(tableId, action);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (NotFoundException e) {
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Move a single column to the start end of the column list, or before or after
+   * another column. Both columns must exist. Returns the version of the table
+   * after the update.
+   * <p>
+   * The operation is done atomically so no optimistic locking is required.
+   *
+   * @param dbSchema
+   * @param name
+   * @param command
+   * @param req
+   * @return
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}/moveColumn")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response moveColumn(
+      @PathParam("dbSchema") final String dbSchema,
+      @PathParam("name") final String name,
+      final MoveColumn command,
+      @Context final HttpServletRequest req
+  )
+  {
+    if (command == null) {
+      return Actions.badRequest(Actions.INVALID, "A MoveColumn object is required");
+    }
+    if (Strings.isNullOrEmpty(command.column)) {
+      return Actions.badRequest(Actions.INVALID, "A column name is required");
+    }
+    if (command.where == null) {
+      return Actions.badRequest(Actions.INVALID, "A target location is required");
+    }
+    if ((command.where == Position.BEFORE || command.where == Position.AFTER) && Strings.isNullOrEmpty(command.anchor)) {
+      return Actions.badRequest(Actions.INVALID, "A anchor column is required for BEFORE or AFTER");
+    }
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        null,
+        req,
+        (spec) -> spec.withColumns(command.perform(spec.columns()))
+    );
+  }
+
+  /**
+   * Hide or unhide columns. If both appear, hide takes precedence. Returns the
+   * new table version.
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}/hideColumns")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response hideColumns(
+      @PathParam("dbSchema") final String dbSchema,
+      @PathParam("name") final String name,
+      final HideColumns command,
+      @Context final HttpServletRequest req
+  )
+  {
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        null,
+        req,
+        (spec) -> {
+          if (!AbstractDatasourceDefn.isDatasource(spec.type())) {
+            throw new ISE("hideColumns is supported only for data source specs");
+          }
+          @SuppressWarnings("unchecked")
+          List<String> hiddenProps = (List<String>) spec.properties().get(AbstractDatasourceDefn.HIDDEN_COLUMNS_PROPERTY);
+          return spec.withProperty(
+              AbstractDatasourceDefn.HIDDEN_COLUMNS_PROPERTY,
+              command.perform(hiddenProps)
+          );
+        }
+    );
+  }
+
+  /**
+   * Drop column metadata. Only removes metadata entries, has no effect on the
+   * physical segments. Returns the new table version.
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}/dropColumns")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response dropColumns(
+      @PathParam("dbSchema") final String dbSchema,
+      @PathParam("name") final String name,
+      final List<String> columns,
+      @Context final HttpServletRequest req
+  )
+  {
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        null,
+        req,
+        (spec) -> spec.withColumns(dropColumns(spec.columns(), columns))
+    );
+  }
+
+  private static <T extends ColumnSpec> List<T> dropColumns(
+      final List<T> columns,
+      final List<String> toDrop)
+  {
+    if (toDrop == null || toDrop.isEmpty()) {
+      return columns;
+    }
+    Set<String> drop = new HashSet<String>(toDrop);
+    List<T> revised = new ArrayList<>();
+    for (T col : columns) {
+      if (!drop.contains(col.name())) {
+        revised.add(col);
+      }
+    }
+    return revised;
+  }
+
+  /**
+   * Retrieves the definition of the given table.
+   * <p>
+   * Returns a 404 (NOT FOUND) error if the table definition does not exist.
+   * Note that this check is only for the <i>definition</i>; the table (or
+   * datasource) itself may exist. Similarly, this call may return a definition
+   * even if there is no datasource of the same name (typically occurs when
+   * the definition is created before the datasource itself.)
+   *
+   * @param dbSchema The Druid schema. The user must have read access.
+   * @param name The name of the table within the schema. The user must have
+   *        read access.
+   * @param req the HTTP request used for authorization.
+   * @return the definition for the table, if any.
+   */
+  @GET
+  @Path("/tables/{dbSchema}/{name}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      @Context final HttpServletRequest req
+  )
+  {
+    Pair<Response, SchemaSpec> result = validateSchema(dbSchema);
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    if (Strings.isNullOrEmpty(name)) {
+      return Actions.badRequest(Actions.INVALID, "Table name is required");
+    }
+    try {
+      catalog.authorizer().authorizeTable(result.rhs, name, Action.READ, req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+    try {
+      TableId tableId = new TableId(dbSchema, name);
+      TableMetadata table = catalog.tables().read(tableId);
+      if (table == null) {
+        return Response.status(Response.Status.NOT_FOUND).build();
+      }
+      return Response.ok().entity(table).build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Retrieves the list of all Druid schema names. At present, Druid does
+   * not impose security on schemas, only tables within schemas.
+   */
+  @GET
+  @Path("/list/schemas/names")

Review Comment:
   can this be just a GET on `/schemas` path. same for all `tables`. For tables of a schema, the GET path will be 
   ```
   /schemas/{schemaName}/tables
   ```



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.table.AbstractDatasourceDefn;
+import org.apache.druid.catalog.storage.Actions;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.HideColumns;
+import org.apache.druid.catalog.storage.MoveColumn;
+import org.apache.druid.catalog.storage.MoveColumn.Position;
+import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(final CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  private enum PostAction
+  {
+    NEW,
+    IFNEW,
+    REPLACE,
+    FORCE;
+  }
+
+  /**
+   * Create a new table containing the given table specification.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param actionParam What to do if the table already exists.
+   *        {@code ifNew} is the same as the SQL IF NOT EXISTS clause. If {@code new},
+   *        then an error is raised if the table exists. If {@code ifNew}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts. The other two options are primarily for use in tests.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response postTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("action") String actionParam,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+    final PostAction action;
+    if (actionParam == null) {
+      action = PostAction.NEW;
+    } else {
+      action = PostAction.valueOf(StringUtils.toUpperCase(actionParam));
+      if (action == null) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Not a valid action: [%s]. Valid actions are new, ifNew, replace, force",
+                actionParam
+            )
+        );
+      }
+    }
+    TableId tableId = TableId.of(dbSchema, name);
+    Response response = authorizeTable(tableId, spec, req);
+    if (response != null) {
+      return response;
+    }
+    TableMetadata table = TableMetadata.newTable(tableId, spec);
+    try {
+      catalog.validate(table);
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+
+    switch (action) {
+      case NEW:
+        return insertTableSpec(table, false);
+      case IFNEW:
+        return insertTableSpec(table, true);
+      case REPLACE:
+        return updateTableSpec(table, version);
+      case FORCE:
+        return addOrUpdateTableSpec(table);
+      default:
+        throw new ISE("Unknown action.");
+    }
+  }
+
+  private Response authorizeTable(TableId tableId, TableSpec spec, final HttpServletRequest req)
+  {
+    // Druid has a fixed set of schemas. Ensure the one provided is valid.
+    Pair<Response, SchemaSpec> result = validateSchema(tableId.schema());
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    SchemaSpec schema = result.rhs;
+
+    // The schema has to be one that allows table definitions.
+    if (!schema.writable()) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format("Cannot modify schema %s", tableId.schema())
+      );
+    }
+
+    // Table name can't be blank or have spaces
+    if (Strings.isNullOrEmpty(tableId.name())) {
+      return Actions.badRequest(Actions.INVALID, "Table name is required");
+    }
+    if (!tableId.name().equals(tableId.name().trim())) {
+      return Actions.badRequest(Actions.INVALID, "Table name cannot start or end with spaces");
+    }
+
+    // The user has to have permission to modify the table.
+    try {
+      catalog.authorizer().authorizeTable(schema, tableId.name(), Action.WRITE, req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+
+    // Validate the spec, if provided.
+    if (spec != null) {
+
+      // The given table spec has to be valid for the given schema.
+      if (Strings.isNullOrEmpty(spec.type())) {
+        return Actions.badRequest(Actions.INVALID, "Table type is required");
+      }
+
+      if (!schema.accepts(spec.type())) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Cannot create tables of type %s in schema %s",
+                spec.getClass().getSimpleName(),
+                tableId.schema()
+            )
+        );
+      }
+    }
+
+    // Everything checks out, let the request proceed.
+    return null;
+  }
+
+  private Response insertTableSpec(TableMetadata table, boolean ifNew)
+  {
+    try {
+      long createVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(createVersion);
+    }
+    catch (DuplicateKeyException e) {
+      if (!ifNew) {
+        return Actions.badRequest(
+              Actions.DUPLICATE_ERROR,
+              StringUtils.format(
+                  "A table of name %s already exists",
+                  table.id().sqlName()
+              )
+        );
+      } else {
+        return Actions.okWithVersion(0);
+      }
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  private Response updateTableSpec(TableMetadata table, long version)
+  {
+    try {
+      long newVersion = catalog.tables().update(table, version);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (NotFoundException e) {
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+    catch (OutOfDateException e) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity(
+              Actions.error(
+                  Actions.DUPLICATE_ERROR,
+                  "The table entry not found or is older than the given version: reload and retry"))
+          .build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  private Response addOrUpdateTableSpec(TableMetadata table)
+  {
+    try {
+      long newVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (DuplicateKeyException e) {
+      // Fall through
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+    try {
+      long newVersion = catalog.tables().update(table, 0);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Update a table within the given schema.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param version An optional table version. If provided, the metadata DB
+   *        entry for the table must be at this exact version or the update
+   *        will fail. (Provides "optimistic locking.") If omitted (that is,
+   *        if zero), then no update conflict change is done.
+   * @param req the HTTP request used for authorization.
+   * @return the new version number of the table
+   */
+  @PUT
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateTableDefn(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+
+    TableDefnRegistry tableRegistry = catalog.tableRegistry();
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        spec,
+        req,
+        (existing) -> tableRegistry.resolve(existing).merge(spec).spec()
+    );
+  }
+
+  private Response incrementalUpdate(
+      TableId tableId,
+      TableSpec newSpec,
+      @Context final HttpServletRequest req,
+      Function<TableSpec, TableSpec> action
+  )
+  {
+    Response response = authorizeTable(tableId, newSpec, req);
+    if (response != null) {
+      return response;
+    }
+    try {
+      long newVersion = catalog.tables().updatePayload(tableId, action);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (NotFoundException e) {
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Move a single column to the start end of the column list, or before or after
+   * another column. Both columns must exist. Returns the version of the table
+   * after the update.
+   * <p>
+   * The operation is done atomically so no optimistic locking is required.
+   *
+   * @param dbSchema
+   * @param name
+   * @param command
+   * @param req
+   * @return
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}/moveColumn")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response moveColumn(
+      @PathParam("dbSchema") final String dbSchema,
+      @PathParam("name") final String name,
+      final MoveColumn command,
+      @Context final HttpServletRequest req
+  )
+  {
+    if (command == null) {
+      return Actions.badRequest(Actions.INVALID, "A MoveColumn object is required");
+    }
+    if (Strings.isNullOrEmpty(command.column)) {
+      return Actions.badRequest(Actions.INVALID, "A column name is required");
+    }
+    if (command.where == null) {
+      return Actions.badRequest(Actions.INVALID, "A target location is required");
+    }
+    if ((command.where == Position.BEFORE || command.where == Position.AFTER) && Strings.isNullOrEmpty(command.anchor)) {
+      return Actions.badRequest(Actions.INVALID, "A anchor column is required for BEFORE or AFTER");
+    }
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        null,
+        req,
+        (spec) -> spec.withColumns(command.perform(spec.columns()))
+    );
+  }
+
+  /**
+   * Hide or unhide columns. If both appear, hide takes precedence. Returns the
+   * new table version.
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}/hideColumns")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response hideColumns(
+      @PathParam("dbSchema") final String dbSchema,
+      @PathParam("name") final String name,
+      final HideColumns command,
+      @Context final HttpServletRequest req
+  )
+  {
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        null,
+        req,
+        (spec) -> {
+          if (!AbstractDatasourceDefn.isDatasource(spec.type())) {
+            throw new ISE("hideColumns is supported only for data source specs");
+          }
+          @SuppressWarnings("unchecked")
+          List<String> hiddenProps = (List<String>) spec.properties().get(AbstractDatasourceDefn.HIDDEN_COLUMNS_PROPERTY);
+          return spec.withProperty(
+              AbstractDatasourceDefn.HIDDEN_COLUMNS_PROPERTY,
+              command.perform(hiddenProps)
+          );
+        }
+    );
+  }
+
+  /**
+   * Drop column metadata. Only removes metadata entries, has no effect on the
+   * physical segments. Returns the new table version.
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}/dropColumns")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response dropColumns(
+      @PathParam("dbSchema") final String dbSchema,
+      @PathParam("name") final String name,
+      final List<String> columns,
+      @Context final HttpServletRequest req
+  )
+  {
+    return incrementalUpdate(
+        TableId.of(dbSchema, name),
+        null,
+        req,
+        (spec) -> spec.withColumns(dropColumns(spec.columns(), columns))
+    );
+  }
+
+  private static <T extends ColumnSpec> List<T> dropColumns(
+      final List<T> columns,
+      final List<String> toDrop)
+  {
+    if (toDrop == null || toDrop.isEmpty()) {
+      return columns;
+    }
+    Set<String> drop = new HashSet<String>(toDrop);
+    List<T> revised = new ArrayList<>();
+    for (T col : columns) {
+      if (!drop.contains(col.name())) {
+        revised.add(col);
+      }
+    }
+    return revised;
+  }
+
+  /**
+   * Retrieves the definition of the given table.
+   * <p>
+   * Returns a 404 (NOT FOUND) error if the table definition does not exist.
+   * Note that this check is only for the <i>definition</i>; the table (or
+   * datasource) itself may exist. Similarly, this call may return a definition
+   * even if there is no datasource of the same name (typically occurs when
+   * the definition is created before the datasource itself.)
+   *
+   * @param dbSchema The Druid schema. The user must have read access.
+   * @param name The name of the table within the schema. The user must have
+   *        read access.
+   * @param req the HTTP request used for authorization.
+   * @return the definition for the table, if any.
+   */
+  @GET
+  @Path("/tables/{dbSchema}/{name}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      @Context final HttpServletRequest req
+  )
+  {
+    Pair<Response, SchemaSpec> result = validateSchema(dbSchema);
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    if (Strings.isNullOrEmpty(name)) {
+      return Actions.badRequest(Actions.INVALID, "Table name is required");
+    }
+    try {
+      catalog.authorizer().authorizeTable(result.rhs, name, Action.READ, req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+    try {
+      TableId tableId = new TableId(dbSchema, name);
+      TableMetadata table = catalog.tables().read(tableId);
+      if (table == null) {
+        return Response.status(Response.Status.NOT_FOUND).build();
+      }
+      return Response.ok().entity(table).build();
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Retrieves the list of all Druid schema names. At present, Druid does
+   * not impose security on schemas, only tables within schemas.
+   */
+  @GET
+  @Path("/list/schemas/names")

Review Comment:
   there will be a parameter to control whether we return name or full detail. similar APIs already exist in druid. 



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.model.ColumnSpec;
+import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.model.TableDefnRegistry;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.model.table.AbstractDatasourceDefn;
+import org.apache.druid.catalog.storage.Actions;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.storage.HideColumns;
+import org.apache.druid.catalog.storage.MoveColumn;
+import org.apache.druid.catalog.storage.MoveColumn.Position;
+import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException;
+import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(final CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  private enum PostAction
+  {
+    NEW,
+    IFNEW,
+    REPLACE,
+    FORCE;
+  }
+
+  /**
+   * Create a new table containing the given table specification.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param actionParam What to do if the table already exists.
+   *        {@code ifNew} is the same as the SQL IF NOT EXISTS clause. If {@code new},
+   *        then an error is raised if the table exists. If {@code ifNew}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts. The other two options are primarily for use in tests.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response postTable(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("action") String actionParam,
+      @QueryParam("version") long version,
+      @Context final HttpServletRequest req
+  )
+  {
+    final PostAction action;
+    if (actionParam == null) {
+      action = PostAction.NEW;
+    } else {
+      action = PostAction.valueOf(StringUtils.toUpperCase(actionParam));
+      if (action == null) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Not a valid action: [%s]. Valid actions are new, ifNew, replace, force",
+                actionParam
+            )
+        );
+      }
+    }
+    TableId tableId = TableId.of(dbSchema, name);
+    Response response = authorizeTable(tableId, spec, req);
+    if (response != null) {
+      return response;
+    }
+    TableMetadata table = TableMetadata.newTable(tableId, spec);
+    try {
+      catalog.validate(table);
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+
+    switch (action) {
+      case NEW:
+        return insertTableSpec(table, false);
+      case IFNEW:
+        return insertTableSpec(table, true);
+      case REPLACE:
+        return updateTableSpec(table, version);
+      case FORCE:
+        return addOrUpdateTableSpec(table);
+      default:
+        throw new ISE("Unknown action.");
+    }
+  }
+
+  private Response authorizeTable(TableId tableId, TableSpec spec, final HttpServletRequest req)
+  {
+    // Druid has a fixed set of schemas. Ensure the one provided is valid.
+    Pair<Response, SchemaSpec> result = validateSchema(tableId.schema());
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    SchemaSpec schema = result.rhs;
+
+    // The schema has to be one that allows table definitions.
+    if (!schema.writable()) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format("Cannot modify schema %s", tableId.schema())
+      );
+    }
+
+    // Table name can't be blank or have spaces
+    if (Strings.isNullOrEmpty(tableId.name())) {
+      return Actions.badRequest(Actions.INVALID, "Table name is required");
+    }
+    if (!tableId.name().equals(tableId.name().trim())) {
+      return Actions.badRequest(Actions.INVALID, "Table name cannot start or end with spaces");
+    }
+
+    // The user has to have permission to modify the table.
+    try {
+      catalog.authorizer().authorizeTable(schema, tableId.name(), Action.WRITE, req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+
+    // Validate the spec, if provided.
+    if (spec != null) {
+
+      // The given table spec has to be valid for the given schema.
+      if (Strings.isNullOrEmpty(spec.type())) {
+        return Actions.badRequest(Actions.INVALID, "Table type is required");
+      }
+
+      if (!schema.accepts(spec.type())) {
+        return Actions.badRequest(
+            Actions.INVALID,
+            StringUtils.format(
+                "Cannot create tables of type %s in schema %s",
+                spec.getClass().getSimpleName(),
+                tableId.schema()
+            )
+        );
+      }
+    }
+
+    // Everything checks out, let the request proceed.
+    return null;
+  }
+
+  private Response insertTableSpec(TableMetadata table, boolean ifNew)
+  {
+    try {
+      long createVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(createVersion);
+    }
+    catch (DuplicateKeyException e) {
+      if (!ifNew) {
+        return Actions.badRequest(
+              Actions.DUPLICATE_ERROR,
+              StringUtils.format(
+                  "A table of name %s already exists",
+                  table.id().sqlName()
+              )
+        );
+      } else {
+        return Actions.okWithVersion(0);
+      }
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  private Response updateTableSpec(TableMetadata table, long version)
+  {
+    try {
+      long newVersion = catalog.tables().update(table, version);
+      return Actions.okWithVersion(newVersion);
+    }
+    catch (NotFoundException e) {
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+    catch (OutOfDateException e) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity(
+              Actions.error(
+                  Actions.DUPLICATE_ERROR,
+                  "The table entry not found or is older than the given version: reload and retry"))

Review Comment:
   should this be a duplicate error? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org