You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/03/24 13:24:04 UTC
[nifi] branch main updated: NIFI-8296 - Use API to retrieve all
subjects associated with a schema id for Confluent Schema Registry v5.4.0+
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 057b4af NIFI-8296 - Use API to retrieve all subjects associated with a schema id for Confluent Schema Registry v5.4.0+
057b4af is described below
commit 057b4af48249b8c22f4f914aac4c7ac4c3f5693c
Author: Dmitry Ibragimov <dm...@leroymerlin.ru>
AuthorDate: Sun Mar 7 16:52:41 2021 +0300
NIFI-8296 - Use API to retrieve all subjects associated with a schema id for Confluent Schema Registry v5.4.0+
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4872.
---
.../client/RestSchemaRegistryClient.java | 44 +++++++++++++++-------
1 file changed, 31 insertions(+), 13 deletions(-)
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 4ff3f2a..6bf2c20 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -109,7 +109,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
@Override
public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException {
- // The Confluent Schema Registry's REST API does not provide us with the 'subject' (name) of a Schema given the ID.
+ // The Confluent Schema Registry's version below 5.3.1 REST API does not provide us with the 'subject' (name) of a Schema given the ID.
// It will provide us only the text of the Schema itself. Therefore, in order to determine the name (which is required for
// a SchemaIdentifier), we must obtain a list of all Schema names, and then request each and every one of the schemas to determine
// if the ID requested matches the Schema's ID.
@@ -119,19 +119,37 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final String schemaPath = getSchemaPath(schemaId);
final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId);
- final JsonNode subjectsJson = fetchJsonResponse("/subjects", "subjects array");
- final ArrayNode subjectsList = (ArrayNode) subjectsJson;
-
+ //Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
+ JsonNode subjectsJson = null;
+ try {
+ subjectsJson = fetchJsonResponse(schemaPath + "/subjects", "schema name");
+ } catch (SchemaNotFoundException e) {
+ logger.debug("Could not find schema name in registry by id in: + " + schemaPath);
+ }
JsonNode completeSchema = null;
- for (JsonNode subject: subjectsList) {
- try {
- final String subjectName = subject.asText();
- completeSchema = postJsonResponse("/subjects/" + subjectName, responseJson, "schema id: " + schemaId);
- break;
- } catch (SchemaNotFoundException e) {
- continue;
+ if(subjectsJson == null) {
+ final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array");
+ final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson;
+ for (JsonNode subject: subjectsAllList) {
+ try {
+ final String searchName = subject.asText();
+ completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
+ break;
+ } catch (SchemaNotFoundException e) {
+ continue;
+ }
+ }
+ } else {
+ final ArrayNode subjectsList = (ArrayNode) subjectsJson;
+ for (JsonNode subject: subjectsList) {
+ try {
+ final String searchName = subject.asText();
+ completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
+ break;
+ } catch (SchemaNotFoundException e) {
+ continue;
+ }
}
-
}
if(completeSchema == null) {
@@ -154,7 +172,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
- + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
+ + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
}
}