You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/07/30 21:16:50 UTC

[nifi] branch main updated: NIFI-8785 Confluent Schema Registry REST client refactoring

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5117fc0  NIFI-8785 Confluent Schema Registry REST client refactoring
5117fc0 is described below

commit 5117fc0619a5f618100f210474ed3aff3a58c60a
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Thu Jul 22 18:53:13 2021 +0200

    NIFI-8785 Confluent Schema Registry REST client refactoring
    
    - Added debug logs and a new method to get schema info without making subjects API calls
    
    This closes #5250
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../client/RestSchemaRegistryClient.java           | 142 ++++++++++++++++-----
 1 file changed, 107 insertions(+), 35 deletions(-)

diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 318d2d7..33e74bc 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -122,34 +122,71 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
 
         // Check if we have cached the Identifier to Name mapping
 
+        JsonNode completeSchema = null;
+
+        // We get the schema definition using the ID of the schema
+        // GET /schemas/ids/{int: id}
         final String schemaPath = getSchemaPath(schemaId);
-        final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId);
-        //Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
+        final JsonNode schemaJson = fetchJsonResponse(schemaPath, "id " + schemaId);
+
+        // Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
+        // GET /schemas/ids/{int: id}/subjects
         JsonNode subjectsJson = null;
         try {
             subjectsJson = fetchJsonResponse(schemaPath + "/subjects", "schema name");
+
+            if(subjectsJson != null) {
+                final ArrayNode subjectsList = (ArrayNode) subjectsJson;
+                for (JsonNode subject: subjectsList) {
+                    final String searchName = subject.asText();
+                    try {
+                        // get complete schema (name + id + version) using the subject name API
+                        completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
+                        break;
+                    } catch (SchemaNotFoundException e) {
+                        logger.debug("Could not find schema in registry by subject name " + searchName, e);
+                        continue;
+                    }
+                }
+            }
+
         } catch (SchemaNotFoundException e) {
-            logger.debug("Could not find schema name in registry by id in: + " + schemaPath);
+            logger.debug("Could not find schema name in registry by id in: " + schemaPath);
         }
-        JsonNode completeSchema = null;
-        if(subjectsJson == null) {
+
+        // Get all couples (subject name, version) for a given schema ID
+        // GET /schemas/ids/{int: id}/versions
+        if(completeSchema == null) {
+            JsonNode subjectsVersions = fetchJsonResponse(schemaPath + "/versions", "schema name");
+
+            if(subjectsVersions != null) {
+                final ArrayNode subjectsVersionsList = (ArrayNode) subjectsVersions;
+                // we want to make sure we get the latest version
+                int maxVersion = 0;
+                String subjectName = null;
+                for (JsonNode subjectVersion: subjectsVersionsList) {
+                    int currentVersion = subjectVersion.get(VERSION_FIELD_NAME).asInt();
+                    String currentSubjectName = subjectVersion.get(SUBJECT_FIELD_NAME).asText();
+                    if(currentVersion > maxVersion) {
+                        maxVersion = currentVersion;
+                        subjectName = currentSubjectName;
+                    }
+                }
+
+                if(subjectName != null) {
+                    return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText());
+                }
+            }
+        }
+
+        // Last resort option: we get the full list of subjects and check one by one to get the complete schema info
+        if(completeSchema == null) {
             final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array");
             final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson;
             for (JsonNode subject: subjectsAllList) {
                 try {
                     final String searchName = subject.asText();
-                    completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
-                    break;
-                } catch (SchemaNotFoundException e) {
-                    continue;
-                }
-            }
-        } else {
-            final ArrayNode subjectsList = (ArrayNode) subjectsJson;
-            for (JsonNode subject: subjectsList) {
-                try {
-                    final String searchName = subject.asText();
-                    completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
+                    completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
                     break;
                 } catch (SchemaNotFoundException e) {
                     continue;
@@ -158,12 +195,23 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
         }
 
         if(completeSchema == null) {
-            throw new SchemaNotFoundException("could not get schema with id: " + schemaId);
+            throw new SchemaNotFoundException("Could not get schema with id: " + schemaId);
         }
 
         return createRecordSchema(completeSchema);
     }
 
+    private RecordSchema createRecordSchema(final String name, final int version, final int id, final String schema) throws SchemaNotFoundException {
+        try {
+            final Schema avroSchema = new Schema.Parser().parse(schema);
+            final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(name).id((long) id).version(version).build();
+            return AvroTypeUtil.createSchema(avroSchema, schema, schemaId);
+        } catch (final SchemaParseException spe) {
+            throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + name
+                    + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
+        }
+    }
+
     private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
         final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText();
         final int version = schemaNode.get(VERSION_FIELD_NAME).asInt();
@@ -173,7 +221,6 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
         try {
             final Schema avroSchema = new Schema.Parser().parse(schemaText);
             final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();
-
             return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
         } catch (final SchemaParseException spe) {
             throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
@@ -196,6 +243,9 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
             final String path = getPath(pathSuffix);
             final String trimmedBase = getTrimmedBase(baseUrl);
             final String url = trimmedBase + path;
+
+            logger.debug("POST JSON response URL {}", url);
+
             final WebTarget webTarget = client.target(url);
             Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE);
             for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
@@ -204,26 +254,40 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
             final Response response = builder.post(Entity.json(schema.toString()));
             final int responseCode = response.getStatus();
 
-            if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
-                continue;
-            }
+            switch (Response.Status.fromStatusCode(responseCode)) {
+                case OK:
+                    JsonNode jsonResponse =  response.readEntity(JsonNode.class);
+
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("JSON Response: {}", jsonResponse);
+                    }
 
-            if(responseCode == Response.Status.OK.getStatusCode()) {
-                return response.readEntity(JsonNode.class);
+                    return jsonResponse;
+
+                case NOT_FOUND:
+                    logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl);
+                    continue;
+
+                default:
+                    errorMessage = response.readEntity(String.class);
+                    continue;
             }
         }
 
-        throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: "
+        throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription
+                + " from any of the Confluent Schema Registry URL's provided; failure response message: "
                 + errorMessage);
     }
 
-    private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException {
+    private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException {
         String errorMessage = null;
         for (final String baseUrl : baseUrls) {
             final String path = getPath(pathSuffix);
             final String trimmedBase = getTrimmedBase(baseUrl);
             final String url = trimmedBase + path;
 
+            logger.debug("GET JSON response URL {}", url);
+
             final WebTarget webTarget = client.target(url);
             Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON);
             for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
@@ -232,20 +296,28 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
             final Response response = builder.get();
             final int responseCode = response.getStatus();
 
-            if (responseCode == Response.Status.OK.getStatusCode()) {
-                return response.readEntity(JsonNode.class);
-            }
+            switch (Response.Status.fromStatusCode(responseCode)) {
+                case OK:
+                    JsonNode jsonResponse =  response.readEntity(JsonNode.class);
 
-            if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
-                throw new SchemaNotFoundException("Could not find Schema with " + schemaDescription + " from the Confluent Schema Registry located at " + baseUrl);
-            }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("JSON Response {}", jsonResponse);
+                    }
+
+                    return jsonResponse;
 
-            if (errorMessage == null) {
-                errorMessage = response.readEntity(String.class);
+                case NOT_FOUND:
+                    logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl);
+                    continue;
+
+                default:
+                    errorMessage = response.readEntity(String.class);
+                    continue;
             }
         }
 
-        throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
+        throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription
+                + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
     }
 
     private String getTrimmedBase(String baseUrl) {