You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2024/02/28 13:42:26 UTC
(nifi) branch main updated: NIFI-12733 Make Apicurio's groupId optional and configurable and use artifactId instead of name as key
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ecea18f796 NIFI-12733 Make Apicurio's groupId optional and configurable and use artifactId instead of name as key
ecea18f796 is described below
commit ecea18f79655c0e34949d94609c8909aeb2d093e
Author: Juldrixx <ju...@gmail.com>
AuthorDate: Fri Feb 2 17:44:37 2024 +0100
NIFI-12733 Make Apicurio's groupId optional and configurable and use artifactId instead of name as key
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #8351.
---
.../schemaregistry/ApicurioSchemaRegistry.java | 24 +++++++----
.../client/ApicurioSchemaRegistryClient.java | 39 ++++++------------
.../client/CachingSchemaRegistryClient.java | 21 ++--------
.../client/SchemaRegistryApiClient.java | 37 +++++++----------
.../client/SchemaRegistryClient.java | 6 +--
.../apicurio/schemaregistry/util/SchemaUtils.java | 44 ++++++---------------
.../client/ApicurioSchemaRegistryClientTest.java | 46 +++++++++-------------
.../client/CachingSchemaRegistryClientTest.java | 26 ++++++------
.../client/SchemaRegistryApiClientTest.java | 36 ++++++-----------
.../schemaregistry/util/SchemaUtilsTest.java | 23 +----------
.../src/test/resources/metadata_response.json | 15 -------
.../test/resources/schema_response_version_3.json | 15 +++++++
.../resources/schema_response_version_latest.json | 15 +++++++
.../src/test/resources/search_response.json | 16 --------
14 files changed, 135 insertions(+), 228 deletions(-)
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java
index 63a68949ec..d36b34c21b 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/ApicurioSchemaRegistry.java
@@ -26,6 +26,7 @@ import org.apache.nifi.apicurio.schemaregistry.client.SchemaRegistryClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -43,7 +44,7 @@ import java.util.concurrent.TimeUnit;
@Tags({"schema", "registry", "apicurio", "avro"})
@CapabilityDescription("Provides a Schema Registry that interacts with the Apicurio Schema Registry so that those Schemas that are stored in the Apicurio Schema "
- + "Registry can be used in NiFi. When a Schema is looked up by name by this registry, it will find a Schema in the Apicurio Schema Registry with their names.")
+ + "Registry can be used in NiFi. When a Schema is looked up by name by this registry, it will find a Schema in the Apicurio Schema Registry with their artifact identifiers.")
public class ApicurioSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
@@ -56,6 +57,15 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements
.addValidator(StandardValidators.URL_VALIDATOR)
.required(true)
.build();
+ static final PropertyDescriptor SCHEMA_GROUP_ID = new PropertyDescriptor.Builder()
+ .name("Schema Group ID")
+ .displayName("Schema Group ID")
+ .description("The artifact Group ID for the schemas")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .defaultValue("default")
+ .required(true)
+ .build();
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Cache Size")
.displayName("Cache Size")
@@ -86,6 +96,7 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA_REGISTRY_URL,
+ SCHEMA_GROUP_ID,
CACHE_SIZE,
CACHE_EXPIRATION,
WEB_CLIENT_PROVIDER
@@ -102,29 +113,26 @@ public class ApicurioSchemaRegistry extends AbstractControllerService implements
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final String schemaRegistryUrl = context.getProperty(SCHEMA_REGISTRY_URL).getValue();
+ final String schemaGroupId = context.getProperty(SCHEMA_GROUP_ID).getValue();
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
final WebClientServiceProvider webClientServiceProvider =
context.getProperty(WEB_CLIENT_PROVIDER).asControllerService(WebClientServiceProvider.class);
- final SchemaRegistryApiClient apiClient = new SchemaRegistryApiClient(webClientServiceProvider, schemaRegistryUrl);
+ final SchemaRegistryApiClient apiClient = new SchemaRegistryApiClient(webClientServiceProvider, schemaRegistryUrl, schemaGroupId);
final SchemaRegistryClient schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
client = new CachingSchemaRegistryClient(schemaRegistryClient, cacheSize, cacheExpiration);
}
@Override
public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
- final String schemaName = schemaIdentifier.getName().orElseThrow(
+ final String schemaId = schemaIdentifier.getName().orElseThrow(
() -> new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present")
);
final OptionalInt version = schemaIdentifier.getVersion();
- if (version.isPresent()) {
- return client.getSchema(schemaName, version.getAsInt());
- } else {
- return client.getSchema(schemaName);
- }
+ return client.getSchema(schemaId, version);
}
@Override
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java
index 793113705a..e7c0947156 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClient.java
@@ -17,13 +17,13 @@
package org.apache.nifi.apicurio.schemaregistry.client;
import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils;
-import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.OptionalInt;
public class ApicurioSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryApiClient apiClient;
@@ -33,37 +33,20 @@ public class ApicurioSchemaRegistryClient implements SchemaRegistryClient {
}
@Override
- public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException {
- final ResultAttributes attributes = getAttributesForSchemaName(schemaName);
- final int version = getVersionAttributeFromMetadata(attributes);
- return createRecordSchemaForAttributes(attributes, version);
+ public RecordSchema getSchema(final String schemaId, final OptionalInt version) throws IOException, SchemaNotFoundException {
+ return createRecordSchemaForAttributes(
+ schemaId,
+ version
+ );
}
- @Override
- public RecordSchema getSchema(final String schemaName, final int version) throws IOException, SchemaNotFoundException {
- final ResultAttributes attributes = getAttributesForSchemaName(schemaName);
- return createRecordSchemaForAttributes(attributes, version);
- }
-
- private ResultAttributes getAttributesForSchemaName(String schemaName) throws IOException {
- final URI searchUri = apiClient.buildSearchUri(schemaName);
- try (final InputStream searchResultStream = apiClient.retrieveResponse(searchUri)) {
- return SchemaUtils.getResultAttributes(searchResultStream);
- }
- }
-
- private int getVersionAttributeFromMetadata(final ResultAttributes attributes) throws IOException {
- final URI metaDataUri = apiClient.buildMetaDataUri(attributes.groupId(), attributes.artifactId());
- try (final InputStream metadataResultStream = apiClient.retrieveResponse(metaDataUri)) {
- return SchemaUtils.extractVersionAttributeFromStream(metadataResultStream);
- }
- }
-
- private RecordSchema createRecordSchemaForAttributes(ResultAttributes attributes, int version) throws IOException, SchemaNotFoundException {
- final URI schemaUri = apiClient.buildSchemaVersionUri(attributes.groupId(), attributes.artifactId(), version);
+ private RecordSchema createRecordSchemaForAttributes(final String artifactId, final OptionalInt version) throws IOException, SchemaNotFoundException {
+ final URI schemaUri = version.isPresent()
+ ? apiClient.buildSchemaVersionUri(artifactId, version.getAsInt()) :
+ apiClient.buildSchemaArtifactUri(artifactId);
try (final InputStream schemaResultStream = apiClient.retrieveResponse(schemaUri)) {
- return SchemaUtils.createRecordSchema(schemaResultStream, attributes.name(), version);
+ return SchemaUtils.createRecordSchema(schemaResultStream, artifactId, version);
}
}
}
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java
index cfb8aff1dd..ba5d74cb1a 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClient.java
@@ -23,10 +23,11 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.record.RecordSchema;
import java.time.Duration;
+import java.util.OptionalInt;
public class CachingSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryClient client;
- private final LoadingCache<Pair<String, Integer>, RecordSchema> schemaCache;
+ private final LoadingCache<Pair<String, OptionalInt>, RecordSchema> schemaCache;
public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int cacheSize, final long expirationNanos) {
this.client = toWrap;
@@ -34,24 +35,10 @@ public class CachingSchemaRegistryClient implements SchemaRegistryClient {
schemaCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
- .build(key -> {
- if (key.getRight() == -1) {
- // If the version in the key is -1, fetch the schema by name only.
- return client.getSchema(key.getLeft());
- } else {
- // If a specific version is provided in the key, fetch the schema with that version.
- return client.getSchema(key.getLeft(), key.getRight());
- }
- });
+ .build(key -> client.getSchema(key.getLeft(), key.getRight()));
}
-
- @Override
- public RecordSchema getSchema(final String schemaName) {
- return schemaCache.get(Pair.of(schemaName, -1));
- }
-
@Override
- public RecordSchema getSchema(final String schemaName, final int version) {
+ public RecordSchema getSchema(final String schemaName, final OptionalInt version) {
return schemaCache.get(Pair.of(schemaName, version));
}
}
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java
index 57488794a8..8d030e29d8 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClient.java
@@ -26,10 +26,12 @@ public class SchemaRegistryApiClient {
private final WebClientServiceProvider webClientServiceProvider;
private final String baseUrl;
+ private final String groupId;
- public SchemaRegistryApiClient(final WebClientServiceProvider webClientServiceProvider, final String baseUrl) {
+ public SchemaRegistryApiClient(final WebClientServiceProvider webClientServiceProvider, final String baseUrl, final String groupId) {
this.webClientServiceProvider = webClientServiceProvider;
this.baseUrl = baseUrl;
+ this.groupId = groupId;
}
public InputStream retrieveResponse(final URI uri) {
@@ -51,37 +53,26 @@ public class SchemaRegistryApiClient {
.addPathSegment("v2");
}
- public URI buildSearchUri(final String schemaName) {
+ private HttpUriBuilder buildBaseSchemaUri() {
return buildBaseUri()
- .addPathSegment("search")
- .addPathSegment("artifacts")
- .addQueryParameter("name", schemaName)
- .addQueryParameter("limit", "1")
- .build();
+ .addPathSegment("groups")
+ .addPathSegment(this.groupId);
}
- public URI buildMetaDataUri(final String groupId, final String artifactId) {
- return buildGroupArtifactsUri(groupId, artifactId)
- .addPathSegment("meta")
- .build();
+ private HttpUriBuilder buildBaseSchemaArtifactUri(final String artifactId) {
+ return buildBaseSchemaUri()
+ .addPathSegment("artifacts")
+ .addPathSegment(artifactId);
}
- public URI buildSchemaUri(final String groupId, final String artifactId) {
- return buildGroupArtifactsUri(groupId, artifactId).build();
+ public URI buildSchemaArtifactUri(final String artifactId) {
+ return buildBaseSchemaArtifactUri(artifactId).build();
}
- public URI buildSchemaVersionUri(final String groupId, final String artifactId, final int version) {
- return buildGroupArtifactsUri(groupId, artifactId)
+ public URI buildSchemaVersionUri(final String artifactId, final int version) {
+ return buildBaseSchemaArtifactUri(artifactId)
.addPathSegment("versions")
.addPathSegment(String.valueOf(version))
.build();
}
-
- private HttpUriBuilder buildGroupArtifactsUri(final String groupId, final String artifactId) {
- return buildBaseUri()
- .addPathSegment("groups")
- .addPathSegment(groupId)
- .addPathSegment("artifacts")
- .addPathSegment(artifactId);
- }
}
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java
index ba21b50734..39a6cf24a2 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryClient.java
@@ -20,12 +20,10 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
+import java.util.OptionalInt;
public interface SchemaRegistryClient {
- RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException;
-
- RecordSchema getSchema(final String schemaName, final int version) throws IOException, SchemaNotFoundException;
-
+ RecordSchema getSchema(final String schemaId, final OptionalInt version) throws IOException, SchemaNotFoundException;
}
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java
index 5202a9e14e..4f77104e63 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/main/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtils.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
-import java.io.UncheckedIOException;
+import java.util.OptionalInt;
public class SchemaUtils {
@@ -39,50 +39,28 @@ public class SchemaUtils {
private SchemaUtils() {
}
- public static RecordSchema createRecordSchema(final InputStream schemaStream, final String name, final int version) throws SchemaNotFoundException, IOException {
+ public static RecordSchema createRecordSchema(final InputStream schemaStream, final String artifactId, final OptionalInt version) throws SchemaNotFoundException, IOException {
final JsonNode schemaNode = OBJECT_MAPPER.readTree(schemaStream);
final String schemaText = schemaNode.toString();
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
- final SchemaIdentifier schemaId = SchemaIdentifier.builder()
- .name(name)
- .version(version)
- .build();
+ final SchemaIdentifier.Builder schemaIdBuilder = SchemaIdentifier.builder()
+ .name(artifactId);
+
+ if (version.isPresent()) {
+ schemaIdBuilder.version(version.getAsInt());
+ }
+
+ final SchemaIdentifier schemaId = schemaIdBuilder.build();
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException e) {
final String errorMessage = String.format("Obtained Schema with name [%s] from Apicurio Schema Registry but the Schema Text " +
- "that was returned is not a valid Avro Schema", name);
+ "that was returned is not a valid Avro Schema", artifactId);
throw new SchemaNotFoundException(errorMessage, e);
}
}
- public static int extractVersionAttributeFromStream(InputStream in) {
- final JsonNode metadataNode;
- try {
- metadataNode = OBJECT_MAPPER.readTree(in);
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to read version from HTTP response stream", e);
- }
- return Integer.parseInt(metadataNode.get("version").asText());
- }
-
- public static ResultAttributes getResultAttributes(InputStream in) {
- final JsonNode jsonNode;
- try {
- jsonNode = OBJECT_MAPPER.readTree(in);
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to read result attributes from HTTP response stream", e);
- }
- final JsonNode artifactNode = jsonNode.get("artifacts").get(0);
- final String groupId = artifactNode.get("groupId").asText();
- final String artifactId = artifactNode.get("id").asText();
- final String name = artifactNode.get("name").asText();
- return new ResultAttributes(groupId, artifactId, name);
- }
-
- public record ResultAttributes(String groupId, String artifactId, String name) {
- }
static ObjectMapper setObjectMapper() {
return new ObjectMapper();
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java
index 671297d23f..9cbbdedbf3 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/ApicurioSchemaRegistryClientTest.java
@@ -19,7 +19,6 @@ package org.apache.nifi.apicurio.schemaregistry.client;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -29,6 +28,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.Charset;
+import java.util.OptionalInt;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doReturn;
@@ -39,55 +39,45 @@ import static org.mockito.Mockito.verify;
class ApicurioSchemaRegistryClientTest {
private static final String TEST_URL = "http://test.apicurio-schema-registry.com:8888";
- private static final String SCHEMA_NAME = "schema1";
+ private static final String ARTIFACT_ID = "artifactId1";
private static final int VERSION = 3;
- private static final String SEARCH_URL = TEST_URL + "/search";
- private static final String METADATA_URL = TEST_URL + "/meta";
+ private static final String SCHEMA_ARTIFACT_URL = TEST_URL + "/schema/" + ARTIFACT_ID;
private static final String SCHEMA_VERSION_URL = TEST_URL + "/schema/versions/" + VERSION;
- private static final String GROUP_ID = "groupId1";
- private static final String ARTIFACT_ID = "artifactId1";
@Mock
private SchemaRegistryApiClient apiClient;
private ApicurioSchemaRegistryClient schemaRegistryClient;
- @BeforeEach
- void setup() {
- doReturn(URI.create(SEARCH_URL)).when(apiClient).buildSearchUri(SCHEMA_NAME);
- doReturn(URI.create(SCHEMA_VERSION_URL)).when(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION);
- doReturn(getResource("search_response.json")).when(apiClient).retrieveResponse(URI.create(SEARCH_URL));
- doReturn(getResource("schema_response.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_VERSION_URL));
- }
-
@Test
- void testGetSchemaWithNameInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
- doReturn(URI.create(METADATA_URL)).when(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
- doReturn(getResource("metadata_response.json")).when(apiClient).retrieveResponse(URI.create(METADATA_URL));
+ void testGetSchemaWithIdInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
+ doReturn(URI.create(SCHEMA_ARTIFACT_URL)).when(apiClient).buildSchemaArtifactUri(ARTIFACT_ID);
+ doReturn(getResource("schema_response_version_latest.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_ARTIFACT_URL));
schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
- final RecordSchema schema = schemaRegistryClient.getSchema(SCHEMA_NAME);
+ final RecordSchema schema = schemaRegistryClient.getSchema(ARTIFACT_ID, OptionalInt.empty());
- verify(apiClient).buildSearchUri(SCHEMA_NAME);
- verify(apiClient).buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
- verify(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION);
+ verify(apiClient).buildSchemaArtifactUri(ARTIFACT_ID);
+ verify(apiClient, never()).buildSchemaVersionUri(ARTIFACT_ID, VERSION);
- final String expectedSchemaText = IOUtils.toString(getResource("schema_response.json"), Charset.defaultCharset())
+ final String expectedSchemaText = IOUtils.toString(getResource("schema_response_version_latest.json"), Charset.defaultCharset())
.replace("\n", "")
.replaceAll(" +", "");
assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty")));
}
@Test
- void testGetSchemaWithNameAndVersionInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
+ void testGetSchemaWithIdAndVersionInvokesApiClientAndReturnsRecordSchema() throws IOException, SchemaNotFoundException {
+ doReturn(URI.create(SCHEMA_VERSION_URL)).when(apiClient).buildSchemaVersionUri(ARTIFACT_ID, VERSION);
+ doReturn(getResource("schema_response_version_3.json")).when(apiClient).retrieveResponse(URI.create(SCHEMA_VERSION_URL));
+
schemaRegistryClient = new ApicurioSchemaRegistryClient(apiClient);
- final RecordSchema schema = schemaRegistryClient.getSchema(SCHEMA_NAME, 3);
+ final RecordSchema schema = schemaRegistryClient.getSchema(ARTIFACT_ID, OptionalInt.of(VERSION));
- verify(apiClient).buildSearchUri(SCHEMA_NAME);
- verify(apiClient, never()).buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
- verify(apiClient).buildSchemaVersionUri(GROUP_ID, ARTIFACT_ID, VERSION);
+ verify(apiClient, never()).buildSchemaArtifactUri(ARTIFACT_ID);
+ verify(apiClient).buildSchemaVersionUri(ARTIFACT_ID, VERSION);
- final String expectedSchemaText = IOUtils.toString(getResource("schema_response.json"), Charset.defaultCharset())
+ final String expectedSchemaText = IOUtils.toString(getResource("schema_response_version_3.json"), Charset.defaultCharset())
.replace("\n", "")
.replaceAll(" +", "");
assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty")));
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java
index 24eda7a159..ef3f63dbe5 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/CachingSchemaRegistryClientTest.java
@@ -29,6 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.util.Arrays;
+import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,35 +61,36 @@ class CachingSchemaRegistryClientTest {
@Test
void testGetSchemaWithNameInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException {
- when(mockClient.getSchema(SCHEMA_NAME)).thenReturn(TEST_SCHEMA);
+ final OptionalInt version = OptionalInt.empty();
- RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME);
- RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME);
+ when(mockClient.getSchema(SCHEMA_NAME, version)).thenReturn(TEST_SCHEMA);
+
+ RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME, version);
+ RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME, version);
assertEquals(TEST_SCHEMA, actualSchema1);
assertEquals(TEST_SCHEMA, actualSchema2);
- verify(mockClient).getSchema(SCHEMA_NAME);
+ verify(mockClient).getSchema(SCHEMA_NAME, version);
}
@Test
void testGetSchemaWithNameAndVersionInvokesClientAndCacheResult() throws IOException, SchemaNotFoundException {
- String schemaName = "schema";
- int version = 1;
+ final OptionalInt version = OptionalInt.of(1);
- when(mockClient.getSchema(schemaName, version)).thenReturn(TEST_SCHEMA);
+ when(mockClient.getSchema(SCHEMA_NAME, version)).thenReturn(TEST_SCHEMA);
- RecordSchema actualSchema1 = cachingClient.getSchema(schemaName, version);
- RecordSchema actualSchema2 = cachingClient.getSchema(schemaName, version);
+ RecordSchema actualSchema1 = cachingClient.getSchema(SCHEMA_NAME, version);
+ RecordSchema actualSchema2 = cachingClient.getSchema(SCHEMA_NAME, version);
assertEquals(TEST_SCHEMA, actualSchema1);
assertEquals(TEST_SCHEMA, actualSchema2);
- verify(mockClient).getSchema(schemaName, version);
+ verify(mockClient).getSchema(SCHEMA_NAME, version);
}
@Test
void testGetSchemaWithNameAndVersionDoesNotCacheDifferentVersions() throws IOException, SchemaNotFoundException {
- int version1 = 1;
- int version2 = 2;
+ final OptionalInt version1 = OptionalInt.of(1);
+ final OptionalInt version2 = OptionalInt.of(2);
RecordSchema expectedSchema1 = TEST_SCHEMA;
RecordSchema expectedSchema2 = TEST_SCHEMA_2;
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java
index 8551c6fb58..486075a7d0 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/client/SchemaRegistryApiClientTest.java
@@ -35,12 +35,12 @@ class SchemaRegistryApiClientTest {
private static final String BASE_URL = "http://test.apicurio-schema-registry.com:8888";
private static final String API_PATH = "/apis/registry/v2";
- private static final String METADATA_PATH = "/meta";
private static final String GROUP_ID = "groupId1";
private static final String ARTIFACT_ID = "artifactId1";
- private static final String SCHEMA_PATH = String.format("/groups/%s/artifacts/%s", GROUP_ID, ARTIFACT_ID);
- private static final String SCHEMA_NAME = "schema1";
- private static final String SEARCH_PATH = String.format("/search/artifacts?name=%s&limit=1", SCHEMA_NAME);
+ private static final int VERSION = 3;
+ private static final String GROUP_PATH = String.format("/groups/%s", GROUP_ID);
+ private static final String ARTIFACT_PATH = String.format("/artifacts/%s", ARTIFACT_ID);
+ private static final String VERSION_PATH = String.format("/versions/%d", VERSION);
@Mock
private WebClientServiceProvider webClientServiceProvider;
@@ -53,7 +53,7 @@ class SchemaRegistryApiClientTest {
@Test
void testBuildBaseUrl() {
- client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL);
+ client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID);
final HttpUriBuilder httpUriBuilder = client.buildBaseUri();
@@ -61,30 +61,20 @@ class SchemaRegistryApiClientTest {
}
@Test
- void testBuildSearchUri() {
- client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL);
+ void testBuildSchemaArtifactUri() {
+ client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID);
- final URI uri = client.buildSearchUri(SCHEMA_NAME);
+ final URI uri = client.buildSchemaArtifactUri(ARTIFACT_ID);
- assertEquals(BASE_URL + API_PATH + SEARCH_PATH, uri.toString());
+ assertEquals(BASE_URL + API_PATH + GROUP_PATH + ARTIFACT_PATH, uri.toString());
}
@Test
- void testBuildMetadataUri() {
- client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL);
+ void testBuildSchemaVersionUri() {
+ client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL, GROUP_ID);
- final URI uri = client.buildMetaDataUri(GROUP_ID, ARTIFACT_ID);
+ final URI uri = client.buildSchemaVersionUri(ARTIFACT_ID, VERSION);
- assertEquals(BASE_URL + API_PATH + SCHEMA_PATH + METADATA_PATH, uri.toString());
+ assertEquals(BASE_URL + API_PATH + GROUP_PATH + ARTIFACT_PATH + VERSION_PATH, uri.toString());
}
-
- @Test
- void testBuildSchemaUri() {
- client = new SchemaRegistryApiClient(webClientServiceProvider, BASE_URL);
-
- final URI uri = client.buildSchemaUri(GROUP_ID, ARTIFACT_ID);
-
- assertEquals(BASE_URL + API_PATH + SCHEMA_PATH, uri.toString());
- }
-
}
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java
index 7b745f6344..cded22eb7f 100644
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/java/org/apache/nifi/apicurio/schemaregistry/util/SchemaUtilsTest.java
@@ -17,7 +17,6 @@
package org.apache.nifi.apicurio.schemaregistry.util;
import org.apache.commons.io.IOUtils;
-import org.apache.nifi.apicurio.schemaregistry.util.SchemaUtils.ResultAttributes;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;
@@ -25,6 +24,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
+import java.util.OptionalInt;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,7 +34,7 @@ class SchemaUtilsTest {
void testCreateRecordSchema() throws SchemaNotFoundException, IOException {
final InputStream in = getResource("schema_response.json");
- final RecordSchema schema = SchemaUtils.createRecordSchema(in, "schema1", 3);
+ final RecordSchema schema = SchemaUtils.createRecordSchema(in, "schema1", OptionalInt.of(3));
assertEquals("schema1", schema.getSchemaName().orElseThrow(() -> new AssertionError("Schema Name is empty")));
assertEquals("schema_namespace_1", schema.getSchemaNamespace().orElseThrow(() -> new AssertionError("Schema Namespace is empty")));
@@ -46,25 +46,6 @@ class SchemaUtilsTest {
assertEquals(expectedSchemaText, schema.getSchemaText().orElseThrow(() -> new AssertionError("Schema Text is empty")));
}
- @Test
- void testGetVersionAttribute() {
- final InputStream in = getResource("metadata_response.json");
-
- int version = SchemaUtils.extractVersionAttributeFromStream(in);
-
- assertEquals(3, version);
- }
-
- @Test
- void testGetResultAttributes() {
- final InputStream in = getResource("search_response.json");
-
- final ResultAttributes resultAttributes = SchemaUtils.getResultAttributes(in);
-
- final ResultAttributes expectedAttributes = new ResultAttributes("groupId1", "artifactId1", "schema1");
- assertEquals(expectedAttributes, resultAttributes);
- }
-
private InputStream getResource(final String resourceName) {
return this.getClass().getClassLoader().getResourceAsStream(resourceName);
}
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/metadata_response.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/metadata_response.json
deleted file mode 100644
index 96459721ca..0000000000
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/metadata_response.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "name": "schema1",
- "createdBy": "",
- "createdOn": "2023-10-16T14:19:21+0000",
- "modifiedBy": "",
- "modifiedOn": "2023-10-16T14:53:12+0000",
- "id": "artifactId1",
- "version": "3",
- "type": "AVRO",
- "globalId": 3,
- "state": "ENABLED",
- "groupId": "groupId1",
- "contentId": 2,
- "references": []
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_3.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_3.json
new file mode 100644
index 0000000000..9adfffeaa2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_3.json
@@ -0,0 +1,15 @@
+{
+ "type": "record",
+ "namespace": "schema_namespace",
+ "name": "schema_version_3",
+ "fields": [
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "age",
+ "type": "int"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_latest.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_latest.json
new file mode 100644
index 0000000000..87f1e60e8d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/schema_response_version_latest.json
@@ -0,0 +1,15 @@
+{
+ "type": "record",
+ "namespace": "schema_namespace",
+ "name": "schema_version_latest",
+ "fields": [
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "age",
+ "type": "int"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/search_response.json b/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/search_response.json
deleted file mode 100644
index 5db50a7a15..0000000000
--- a/nifi-nar-bundles/nifi-apicurio-bundle/nifi-apicurio-schema-registry-service/src/test/resources/search_response.json
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "artifacts": [
- {
- "id": "artifactId1",
- "name": "schema1",
- "createdOn": "2023-10-16T14:19:21+0000",
- "createdBy": "",
- "type": "AVRO",
- "state": "ENABLED",
- "modifiedOn": "2023-10-16T14:19:31+0000",
- "modifiedBy": "",
- "groupId": "groupId1"
- }
- ],
- "count": 1
-}
\ No newline at end of file