You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2022/05/18 00:18:32 UTC
[iceberg] branch master updated: Core: Add request headers to REST client. (#4772)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b37204ae4 Core: Add request headers to REST client. (#4772)
b37204ae4 is described below
commit b37204ae467b710ecf0ace9b1c2b49bcd256ab9b
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue May 17 17:18:28 2022 -0700
Core: Add request headers to REST client. (#4772)
---
.../java/org/apache/iceberg/rest/HTTPClient.java | 53 +++++++++---------
.../java/org/apache/iceberg/rest/RESTCatalog.java | 63 ++++++++++++----------
.../java/org/apache/iceberg/rest/RESTClient.java | 10 ++--
.../apache/iceberg/rest/RESTTableOperations.java | 13 +++--
.../org/apache/iceberg/rest/ResourcePaths.java | 4 ++
.../apache/iceberg/rest/RESTCatalogAdapter.java | 24 +++++----
.../org/apache/iceberg/rest/TestHTTPClient.java | 9 ++--
.../org/apache/iceberg/rest/TestRESTCatalog.java | 30 ++++++++---
8 files changed, 121 insertions(+), 85 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
index 4e558852d..70dbe736d 100644
--- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -59,13 +59,12 @@ public class HTTPClient implements RESTClient {
private final String uri;
private final CloseableHttpClient httpClient;
private final ObjectMapper mapper;
- private final Map<String, String> additionalHeaders;
+ private final Map<String, String> baseHeaders;
- private HTTPClient(
- String uri, CloseableHttpClient httpClient, Map<String, String> additionalHeaders) {
+ private HTTPClient(String uri, Map<String, String> baseHeaders) {
this.uri = uri;
- this.httpClient = httpClient != null ? httpClient : HttpClients.createDefault();
- this.additionalHeaders = additionalHeaders != null ? additionalHeaders : ImmutableMap.of();
+ this.httpClient = HttpClients.createDefault();
+ this.baseHeaders = baseHeaders != null ? baseHeaders : ImmutableMap.of();
this.mapper = RESTObjectMapper.mapper();
}
@@ -143,7 +142,8 @@ public class HTTPClient implements RESTClient {
* @return The response entity, parsed and converted to its type T
*/
private <T> T execute(
- Method method, String path, Object requestBody, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
+ Method method, String path, Object requestBody, Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler) {
if (path.startsWith("/")) {
throw new RESTException(
"Received a malformed path for a REST request: %s. Paths should not start with /", path);
@@ -154,14 +154,14 @@ public class HTTPClient implements RESTClient {
if (requestBody instanceof Map) {
// encode maps as form data, application/x-www-form-urlencoded
- addRequestHeaders(request, ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ addRequestHeaders(request, headers, ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
request.setEntity(toFormEncoding((Map<?, ?>) requestBody));
} else if (requestBody != null) {
// other request bodies are serialized as JSON, application/json
- addRequestHeaders(request, ContentType.APPLICATION_JSON.getMimeType());
+ addRequestHeaders(request, headers, ContentType.APPLICATION_JSON.getMimeType());
request.setEntity(toJson(requestBody));
} else {
- addRequestHeaders(request, ContentType.APPLICATION_JSON.getMimeType());
+ addRequestHeaders(request, headers, ContentType.APPLICATION_JSON.getMimeType());
}
try (CloseableHttpResponse response = httpClient.execute(request)) {
@@ -197,39 +197,40 @@ public class HTTPClient implements RESTClient {
}
@Override
- public void head(String path, Consumer<ErrorResponse> errorHandler) {
- execute(Method.HEAD, path, null, null, errorHandler);
+ public void head(String path, Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+ execute(Method.HEAD, path, null, null, headers, errorHandler);
}
@Override
- public <T extends RESTResponse> T get(String path, Class<T> responseType,
+ public <T extends RESTResponse> T get(String path, Class<T> responseType, Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
- return execute(Method.GET, path, null, responseType, errorHandler);
+ return execute(Method.GET, path, null, responseType, headers, errorHandler);
}
@Override
public <T extends RESTResponse> T post(String path, RESTRequest body, Class<T> responseType,
- Consumer<ErrorResponse> errorHandler) {
- return execute(Method.POST, path, body, responseType, errorHandler);
+ Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+ return execute(Method.POST, path, body, responseType, headers, errorHandler);
}
@Override
- public <T extends RESTResponse> T delete(String path, Class<T> responseType,
+ public <T extends RESTResponse> T delete(String path, Class<T> responseType, Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
- return execute(Method.DELETE, path, null, responseType, errorHandler);
+ return execute(Method.DELETE, path, null, responseType, headers, errorHandler);
}
@Override
public <T extends RESTResponse> T postForm(String path, Map<String, String> formData, Class<T> responseType,
Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
- return execute(Method.POST, path, formData, responseType, errorHandler);
+ return execute(Method.POST, path, formData, responseType, headers, errorHandler);
}
- private void addRequestHeaders(HttpUriRequest request, String bodyMimeType) {
+ private void addRequestHeaders(HttpUriRequest request, Map<String, String> requestHeaders, String bodyMimeType) {
request.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON.getMimeType());
// Many systems require that content type is set regardless and will fail, even on an empty bodied request.
request.setHeader(HttpHeaders.CONTENT_TYPE, bodyMimeType);
- additionalHeaders.forEach(request::setHeader);
+ baseHeaders.forEach(request::setHeader);
+ requestHeaders.forEach(request::setHeader);
}
@Override
@@ -242,10 +243,8 @@ public class HTTPClient implements RESTClient {
}
public static class Builder {
- private final Map<String, String> additionalHeaders = Maps.newHashMap();
+ private final Map<String, String> baseHeaders = Maps.newHashMap();
private String uri;
- private CloseableHttpClient httpClient;
- private ObjectMapper mapper;
private Builder() {
}
@@ -261,23 +260,23 @@ public class HTTPClient implements RESTClient {
}
public Builder withHeader(String key, String value) {
- additionalHeaders.put(key, value);
+ baseHeaders.put(key, value);
return this;
}
public Builder withHeaders(Map<String, String> headers) {
- additionalHeaders.putAll(headers);
+ baseHeaders.putAll(headers);
return this;
}
public Builder withBearerAuth(String token) {
Preconditions.checkNotNull(token, "Invalid auth token: null");
- additionalHeaders.put(HttpHeaders.AUTHORIZATION, asBearer(token));
+ baseHeaders.put(HttpHeaders.AUTHORIZATION, asBearer(token));
return this;
}
public HTTPClient build() {
- return new HTTPClient(uri, httpClient, additionalHeaders);
+ return new HTTPClient(uri, baseHeaders);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index 905a7849c..bb758de94 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -96,6 +96,10 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
this.io = CatalogUtil.loadFileIO(ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf);
}
+ protected Map<String, String> headers() {
+ return ImmutableMap.of();
+ }
+
public Map<String, String> properties() {
return properties;
}
@@ -113,7 +117,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
@Override
public List<TableIdentifier> listTables(Namespace ns) {
ListTablesResponse response = client.get(
- paths.tables(ns), ListTablesResponse.class, ErrorHandlers.namespaceErrorHandler());
+ paths.tables(ns), ListTablesResponse.class, headers(), ErrorHandlers.namespaceErrorHandler());
return response.identifiers();
}
@@ -121,7 +125,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
public boolean dropTable(TableIdentifier identifier, boolean purge) {
// TODO: support purge flagN
try {
- client.delete(paths.table(identifier), null, ErrorHandlers.tableErrorHandler());
+ client.delete(paths.table(identifier), null, headers(), ErrorHandlers.tableErrorHandler());
return true;
} catch (NoSuchTableException e) {
return false;
@@ -136,21 +140,20 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
.build();
// for now, ignore the response because there is no way to return it.
- client.post("v1/tables/rename", request, null, ErrorHandlers.tableErrorHandler());
+ client.post(paths.rename(), request, null, headers(), ErrorHandlers.tableErrorHandler());
}
- private LoadTableResponse loadInternal(TableIdentifier identifier) {
- return client.get(paths.table(identifier), LoadTableResponse.class, ErrorHandlers.tableErrorHandler());
+ private LoadTableResponse loadInternal(TableIdentifier identifier, Map<String, String> headers) {
+ return client.get(paths.table(identifier), LoadTableResponse.class, headers, ErrorHandlers.tableErrorHandler());
}
@Override
public Table loadTable(TableIdentifier identifier) {
- LoadTableResponse response = loadInternal(identifier);
+ LoadTableResponse response = loadInternal(identifier, headers());
Pair<RESTClient, FileIO> clients = tableClients(response.config());
-
- return new BaseTable(
- new RESTTableOperations(clients.first(), paths.table(identifier), clients.second(), response.tableMetadata()),
- fullTableName(identifier));
+ RESTTableOperations ops = new RESTTableOperations(
+ clients.first(), paths.table(identifier), headers(), clients.second(), response.tableMetadata());
+ return new BaseTable(ops, fullTableName(identifier));
}
@Override
@@ -161,7 +164,8 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
.build();
// for now, ignore the response because there is no way to return it
- client.post(paths.namespaces(), request, CreateNamespaceResponse.class, ErrorHandlers.namespaceErrorHandler());
+ client.post(
+ paths.namespaces(), request, CreateNamespaceResponse.class, headers(), ErrorHandlers.namespaceErrorHandler());
}
@Override
@@ -169,7 +173,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
Preconditions.checkArgument(namespace.isEmpty(), "Cannot list namespaces under parent: %s", namespace);
// String joined = NULL.join(namespace.levels());
ListNamespacesResponse response = client
- .get(paths.namespaces(), ListNamespacesResponse.class, ErrorHandlers.namespaceErrorHandler());
+ .get(paths.namespaces(), ListNamespacesResponse.class, headers(), ErrorHandlers.namespaceErrorHandler());
return response.namespaces();
}
@@ -177,14 +181,14 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
public Map<String, String> loadNamespaceMetadata(Namespace ns) throws NoSuchNamespaceException {
// TODO: rename to LoadNamespaceResponse?
GetNamespaceResponse response = client
- .get(paths.namespace(ns), GetNamespaceResponse.class, ErrorHandlers.namespaceErrorHandler());
+ .get(paths.namespace(ns), GetNamespaceResponse.class, headers(), ErrorHandlers.namespaceErrorHandler());
return response.properties();
}
@Override
public boolean dropNamespace(Namespace ns) throws NamespaceNotEmptyException {
try {
- client.delete(paths.namespace(ns), null, ErrorHandlers.namespaceErrorHandler());
+ client.delete(paths.namespace(ns), null, headers(), ErrorHandlers.namespaceErrorHandler());
return true;
} catch (NoSuchNamespaceException e) {
return false;
@@ -198,7 +202,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
.build();
UpdateNamespacePropertiesResponse response = client.post(
- paths.namespaceProperties(ns), request, UpdateNamespacePropertiesResponse.class,
+ paths.namespaceProperties(ns), request, UpdateNamespacePropertiesResponse.class, headers(),
ErrorHandlers.namespaceErrorHandler());
return !response.updated().isEmpty();
@@ -211,7 +215,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
.build();
UpdateNamespacePropertiesResponse response = client.post(
- paths.namespaceProperties(ns), request, UpdateNamespacePropertiesResponse.class,
+ paths.namespaceProperties(ns), request, UpdateNamespacePropertiesResponse.class, headers(),
ErrorHandlers.namespaceErrorHandler());
return !response.removed().isEmpty();
@@ -219,7 +223,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
- return new Builder(identifier, schema);
+ return new Builder(identifier, schema, headers());
}
@Override
@@ -232,14 +236,16 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
private class Builder implements TableBuilder {
private final TableIdentifier ident;
private final Schema schema;
+ private final Map<String, String> headers;
private final ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
private PartitionSpec spec = null;
private SortOrder writeOrder = null;
private String location = null;
- private Builder(TableIdentifier ident, Schema schema) {
+ private Builder(TableIdentifier ident, Schema schema, Map<String, String> headers) {
this.ident = ident;
this.schema = schema;
+ this.headers = headers;
}
@Override
@@ -274,7 +280,6 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
@Override
public Table create() {
- String ns = RESTUtil.encodeNamespace(ident.namespace());
CreateTableRequest request = CreateTableRequest.builder()
.withName(ident.name())
.withSchema(schema)
@@ -285,13 +290,14 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
.build();
LoadTableResponse response = client.post(
- "v1/namespaces/" + ns + "/tables", request, LoadTableResponse.class, ErrorHandlers.tableErrorHandler());
+ paths.tables(ident.namespace()), request, LoadTableResponse.class, headers,
+ ErrorHandlers.tableErrorHandler());
Pair<RESTClient, FileIO> clients = tableClients(response.config());
+ RESTTableOperations ops = new RESTTableOperations(
+ clients.first(), paths.table(ident), headers, clients.second(), response.tableMetadata());
- return new BaseTable(
- new RESTTableOperations(clients.first(), paths.table(ident), clients.second(), response.tableMetadata()),
- fullTableName(ident));
+ return new BaseTable(ops, fullTableName(ident));
}
@Override
@@ -303,7 +309,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
TableMetadata meta = response.tableMetadata();
RESTTableOperations ops = new RESTTableOperations(
- clients.first(), paths.table(ident), clients.second(),
+ clients.first(), paths.table(ident), headers, clients.second(),
RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta);
return Transactions.createTableTransaction(fullName, ops, meta);
@@ -311,7 +317,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
@Override
public Transaction replaceTransaction() {
- LoadTableResponse response = loadInternal(ident);
+ LoadTableResponse response = loadInternal(ident, headers);
String fullName = fullTableName(ident);
Pair<RESTClient, FileIO> clients = tableClients(response.config());
@@ -343,7 +349,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
}
RESTTableOperations ops = new RESTTableOperations(
- clients.first(), paths.table(ident), clients.second(),
+ clients.first(), paths.table(ident), headers, clients.second(),
RESTTableOperations.UpdateType.REPLACE, changes.build(), base);
return Transactions.replaceTableTransaction(fullName, ops, replacement);
@@ -376,7 +382,8 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
.build();
return client.post(
- paths.tables(ident.namespace()), request, LoadTableResponse.class, ErrorHandlers.tableErrorHandler());
+ paths.tables(ident.namespace()), request, LoadTableResponse.class, headers,
+ ErrorHandlers.tableErrorHandler());
}
}
@@ -443,7 +450,7 @@ public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Co
try {
ConfigResponse configResponse = singleUseClient
- .get(ResourcePaths.config(), ConfigResponse.class, ErrorHandlers.defaultErrorHandler());
+ .get(ResourcePaths.config(), ConfigResponse.class, headers(), ErrorHandlers.defaultErrorHandler());
configResponse.validate();
return configResponse;
} finally {
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTClient.java b/core/src/main/java/org/apache/iceberg/rest/RESTClient.java
index a559946f5..777d73ee1 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTClient.java
@@ -29,13 +29,15 @@ import org.apache.iceberg.rest.responses.ErrorResponse;
*/
public interface RESTClient extends Closeable {
- void head(String path, Consumer<ErrorResponse> errorHandler);
+ void head(String path, Map<String, String> headers, Consumer<ErrorResponse> errorHandler);
- <T extends RESTResponse> T delete(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
+ <T extends RESTResponse> T delete(String path, Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler);
- <T extends RESTResponse> T get(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler);
+ <T extends RESTResponse> T get(String path, Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler);
- <T extends RESTResponse> T post(String path, RESTRequest body, Class<T> responseType,
+ <T extends RESTResponse> T post(String path, RESTRequest body, Class<T> responseType, Map<String, String> headers,
Consumer<ErrorResponse> errorHandler);
<T extends RESTResponse> T postForm(String path, Map<String, String> formData, Class<T> responseType,
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
index 9e3938ff6..1e23d464a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.rest;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.iceberg.LocationProviders;
@@ -48,20 +49,22 @@ class RESTTableOperations implements TableOperations {
private final RESTClient client;
private final String path;
+ private final Map<String, String> headers;
private final FileIO io;
private final List<MetadataUpdate> createChanges;
private final TableMetadata replaceBase;
private UpdateType updateType;
private TableMetadata current;
- RESTTableOperations(RESTClient client, String path, FileIO io, TableMetadata current) {
- this(client, path, io, UpdateType.SIMPLE, Lists.newArrayList(), current);
+ RESTTableOperations(RESTClient client, String path, Map<String, String> headers, FileIO io, TableMetadata current) {
+ this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current);
}
- RESTTableOperations(RESTClient client, String path, FileIO io, UpdateType updateType,
+ RESTTableOperations(RESTClient client, String path, Map<String, String> headers, FileIO io, UpdateType updateType,
List<MetadataUpdate> createChanges, TableMetadata current) {
this.client = client;
this.path = path;
+ this.headers = headers;
this.io = io;
this.updateType = updateType;
this.createChanges = createChanges;
@@ -80,7 +83,7 @@ class RESTTableOperations implements TableOperations {
@Override
public TableMetadata refresh() {
- return updateCurrentMetadata(client.get(path, LoadTableResponse.class, ErrorHandlers.tableErrorHandler()));
+ return updateCurrentMetadata(client.get(path, LoadTableResponse.class, headers, ErrorHandlers.tableErrorHandler()));
}
@Override
@@ -121,7 +124,7 @@ class RESTTableOperations implements TableOperations {
// the error handler will throw necessary exceptions like CommitFailedException and UnknownCommitStateException
// TODO: ensure that the HTTP client lib passes HTTP client errors to the error handler
- LoadTableResponse response = client.post(path, request, LoadTableResponse.class, errorHandler);
+ LoadTableResponse response = client.post(path, request, LoadTableResponse.class, headers, errorHandler);
// all future commits should be simple commits
this.updateType = UpdateType.SIMPLE;
diff --git a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
index e37c60bcb..f62ea4456 100644
--- a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
+++ b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java
@@ -63,4 +63,8 @@ public class ResourcePaths {
"v1", prefix, "namespaces", RESTUtil.encodeNamespace(ident.namespace()), "tables",
RESTUtil.encodeString(ident.name()));
}
+
+ public String rename() {
+ return SLASH.join("v1", prefix, "tables", "rename");
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index 2d69d722d..0dea9d443 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -81,7 +81,7 @@ public class RESTCatalogAdapter implements RESTClient {
this.asNamespaceCatalog = catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
}
- private enum HTTPMethod {
+ enum HTTPMethod {
GET,
HEAD,
POST,
@@ -240,7 +240,7 @@ public class RESTCatalogAdapter implements RESTClient {
}
public <T extends RESTResponse> T execute(HTTPMethod method, String path, Object body, Class<T> responseType,
- Consumer<ErrorResponse> errorHandler) {
+ Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
ErrorResponse.Builder errorBuilder = ErrorResponse.builder();
Pair<Route, Map<String, String>> routeAndVars = Route.from(method, path);
if (routeAndVars != null) {
@@ -266,30 +266,32 @@ public class RESTCatalogAdapter implements RESTClient {
}
@Override
- public <T extends RESTResponse> T delete(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
- return execute(HTTPMethod.DELETE, path, null, responseType, errorHandler);
+ public <T extends RESTResponse> T delete(String path, Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler) {
+ return execute(HTTPMethod.DELETE, path, null, responseType, headers, errorHandler);
}
@Override
public <T extends RESTResponse> T post(String path, RESTRequest body, Class<T> responseType,
- Consumer<ErrorResponse> errorHandler) {
- return execute(HTTPMethod.POST, path, body, responseType, errorHandler);
+ Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+ return execute(HTTPMethod.POST, path, body, responseType, headers, errorHandler);
}
@Override
- public <T extends RESTResponse> T get(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
- return execute(HTTPMethod.GET, path, null, responseType, errorHandler);
+ public <T extends RESTResponse> T get(String path, Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler) {
+ return execute(HTTPMethod.GET, path, null, responseType, headers, errorHandler);
}
@Override
- public void head(String path, Consumer<ErrorResponse> errorHandler) {
- execute(HTTPMethod.HEAD, path, null, null, errorHandler);
+ public void head(String path, Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+ execute(HTTPMethod.HEAD, path, null, null, headers, errorHandler);
}
@Override
public <T extends RESTResponse> T postForm(String path, Map<String, String> formData, Class<T> responseType,
Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
- return execute(HTTPMethod.POST, path, formData, responseType, errorHandler);
+ return execute(HTTPMethod.POST, path, formData, responseType, headers, errorHandler);
}
@Override
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
index fce65acc1..a77c90f24 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestHTTPClient.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.ErrorResponseParser;
import org.junit.AfterClass;
@@ -198,14 +199,14 @@ public class TestHTTPClient {
private static Item doExecuteRequest(HttpMethod method, String path, Item body, Consumer<ErrorResponse> onError) {
switch (method) {
case POST:
- return restClient.post(path, body, Item.class, onError);
+ return restClient.post(path, body, Item.class, ImmutableMap.of(), onError);
case GET:
- return restClient.get(path, Item.class, onError);
+ return restClient.get(path, Item.class, ImmutableMap.of(), onError);
case HEAD:
- restClient.head(path, onError);
+ restClient.head(path, ImmutableMap.of(), onError);
return null;
case DELETE:
- return restClient.delete(path, Item.class, onError);
+ return restClient.delete(path, Item.class, ImmutableMap.of(), onError);
default:
throw new IllegalArgumentException(String.format("Invalid method: %s", method));
}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 6bb2e199a..2ad30cd2f 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -60,9 +61,25 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
JdbcCatalog.PROPERTY_PREFIX + "password", "password");
backendCatalog.initialize("backend", backendCatalogProperties);
- RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog);
+ Map<String, String> testHeaders = ImmutableMap.of("header", "value");
+
+ RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog) {
+ @Override
+ public <T extends RESTResponse> T execute(RESTCatalogAdapter.HTTPMethod method, String path, Object body,
+ Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler) {
+ Assertions.assertEquals(headers, testHeaders, "Should pass headers through");
+ return super.execute(method, path, body, responseType, headers, errorHandler);
+ }
+ };
+
+ this.restCatalog = new RESTCatalog((config) -> adaptor) {
+ @Override
+ protected Map<String, String> headers() {
+ return testHeaders;
+ }
+ };
- this.restCatalog = new RESTCatalog((config) -> adaptor);
restCatalog.setConf(conf);
restCatalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored"));
@@ -100,18 +117,19 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
public void testConfigRoute() throws IOException {
RESTClient testClient = new RESTClient() {
@Override
- public void head(String path, Consumer<ErrorResponse> errorHandler) {
+ public void head(String path, Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
throw new UnsupportedOperationException("Should not be called for testConfigRoute");
}
@Override
- public <T extends RESTResponse> T delete(String path, Class<T> responseType,
+ public <T extends RESTResponse> T delete(String path, Class<T> responseType, Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
throw new UnsupportedOperationException("Should not be called for testConfigRoute");
}
@Override
- public <T extends RESTResponse> T get(String path, Class<T> responseType, Consumer<ErrorResponse> errorHandler) {
+ public <T extends RESTResponse> T get(String path, Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler) {
return (T) ConfigResponse
.builder()
.withDefaults(ImmutableMap.of(CatalogProperties.CLIENT_POOL_SIZE, "1"))
@@ -121,7 +139,7 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
@Override
public <T extends RESTResponse> T post(String path, RESTRequest body, Class<T> responseType,
- Consumer<ErrorResponse> errorHandler) {
+ Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
throw new UnsupportedOperationException("Should not be called for testConfigRoute");
}