You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/03/24 13:24:04 UTC

[nifi] branch main updated: NIFI-8296 - Use API to retrieve all subjects associated with a schema id for Confluent Schema Registry v5.4.0+

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 057b4af  NIFI-8296 - Use API to retrieve all subjects associated with a schema id for Confluent Schema Registry v5.4.0+
057b4af is described below

commit 057b4af48249b8c22f4f914aac4c7ac4c3f5693c
Author: Dmitry Ibragimov <dm...@leroymerlin.ru>
AuthorDate: Sun Mar 7 16:52:41 2021 +0300

    NIFI-8296 - Use API to retrieve all subjects associated with a schema id for Confluent Schema Registry v5.4.0+
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4872.
---
 .../client/RestSchemaRegistryClient.java           | 44 +++++++++++++++-------
 1 file changed, 31 insertions(+), 13 deletions(-)

diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 4ff3f2a..6bf2c20 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -109,7 +109,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
 
     @Override
     public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException {
-        // The Confluent Schema Registry's REST API does not provide us with the 'subject' (name) of a Schema given the ID.
+        // The Confluent Schema Registry's version below 5.3.1 REST API does not provide us with the 'subject' (name) of a Schema given the ID.
         // It will provide us only the text of the Schema itself. Therefore, in order to determine the name (which is required for
         // a SchemaIdentifier), we must obtain a list of all Schema names, and then request each and every one of the schemas to determine
         // if the ID requested matches the Schema's ID.
@@ -119,19 +119,37 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
 
         final String schemaPath = getSchemaPath(schemaId);
         final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId);
-        final JsonNode subjectsJson = fetchJsonResponse("/subjects", "subjects array");
-        final ArrayNode subjectsList = (ArrayNode) subjectsJson;
-
+        //Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
+        JsonNode subjectsJson = null;
+        try {
+            subjectsJson = fetchJsonResponse(schemaPath + "/subjects", "schema name");
+        } catch (SchemaNotFoundException e) {
+            logger.debug("Could not find schema name in registry by id in: + " + schemaPath);
+        }
         JsonNode completeSchema = null;
-        for (JsonNode subject: subjectsList) {
-            try {
-                final String subjectName = subject.asText();
-                completeSchema = postJsonResponse("/subjects/" + subjectName, responseJson, "schema id: " + schemaId);
-                break;
-            } catch (SchemaNotFoundException e) {
-                continue;
+        if(subjectsJson == null) {
+            final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array");
+            final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson;
+            for (JsonNode subject: subjectsAllList) {
+                try {
+                    final String searchName = subject.asText();
+                    completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
+                    break;
+                } catch (SchemaNotFoundException e) {
+                    continue;
+                }
+            }
+        } else {
+            final ArrayNode subjectsList = (ArrayNode) subjectsJson;
+            for (JsonNode subject: subjectsList) {
+                try {
+                    final String searchName = subject.asText();
+                    completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
+                    break;
+                } catch (SchemaNotFoundException e) {
+                    continue;
+                }
             }
-
         }
 
         if(completeSchema == null) {
@@ -154,7 +172,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
             return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
         } catch (final SchemaParseException spe) {
             throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
-                + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
+                    + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
         }
     }