You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/07/30 21:16:50 UTC
[nifi] branch main updated: NIFI-8785 Confluent Schema Registry
REST client refactoring
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5117fc0 NIFI-8785 Confluent Schema Registry REST client refactoring
5117fc0 is described below
commit 5117fc0619a5f618100f210474ed3aff3a58c60a
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Thu Jul 22 18:53:13 2021 +0200
NIFI-8785 Confluent Schema Registry REST client refactoring
- Added debug logs and a new method to get schema info without making subjects API calls
This closes #5250
Signed-off-by: David Handermann <ex...@apache.org>
---
.../client/RestSchemaRegistryClient.java | 142 ++++++++++++++++-----
1 file changed, 107 insertions(+), 35 deletions(-)
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 318d2d7..33e74bc 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -122,34 +122,71 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
// Check if we have cached the Identifier to Name mapping
+ JsonNode completeSchema = null;
+
+ // We get the schema definition using the ID of the schema
+ // GET /schemas/ids/{int: id}
final String schemaPath = getSchemaPath(schemaId);
- final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + schemaId);
- //Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
+ final JsonNode schemaJson = fetchJsonResponse(schemaPath, "id " + schemaId);
+
+ // Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
+ // GET /schemas/ids/{int: id}/subjects
JsonNode subjectsJson = null;
try {
subjectsJson = fetchJsonResponse(schemaPath + "/subjects", "schema name");
+
+ if(subjectsJson != null) {
+ final ArrayNode subjectsList = (ArrayNode) subjectsJson;
+ for (JsonNode subject: subjectsList) {
+ final String searchName = subject.asText();
+ try {
+ // get complete schema (name + id + version) using the subject name API
+ completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
+ break;
+ } catch (SchemaNotFoundException e) {
+ logger.debug("Could not find schema in registry by subject name " + searchName, e);
+ continue;
+ }
+ }
+ }
+
} catch (SchemaNotFoundException e) {
- logger.debug("Could not find schema name in registry by id in: + " + schemaPath);
+ logger.debug("Could not find schema name in registry by id in: " + schemaPath);
}
- JsonNode completeSchema = null;
- if(subjectsJson == null) {
+
+ // Get all couples (subject name, version) for a given schema ID
+ // GET /schemas/ids/{int: id}/versions
+ if(completeSchema == null) {
+ JsonNode subjectsVersions = fetchJsonResponse(schemaPath + "/versions", "schema name");
+
+ if(subjectsVersions != null) {
+ final ArrayNode subjectsVersionsList = (ArrayNode) subjectsVersions;
+ // we want to make sure we get the latest version
+ int maxVersion = 0;
+ String subjectName = null;
+ for (JsonNode subjectVersion: subjectsVersionsList) {
+ int currentVersion = subjectVersion.get(VERSION_FIELD_NAME).asInt();
+ String currentSubjectName = subjectVersion.get(SUBJECT_FIELD_NAME).asText();
+ if(currentVersion > maxVersion) {
+ maxVersion = currentVersion;
+ subjectName = currentSubjectName;
+ }
+ }
+
+ if(subjectName != null) {
+ return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText());
+ }
+ }
+ }
+
+ // Last resort option: we get the full list of subjects and check one by one to get the complete schema info
+ if(completeSchema == null) {
final JsonNode subjectsAllJson = fetchJsonResponse("/subjects", "subjects array");
final ArrayNode subjectsAllList = (ArrayNode) subjectsAllJson;
for (JsonNode subject: subjectsAllList) {
try {
final String searchName = subject.asText();
- completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
- break;
- } catch (SchemaNotFoundException e) {
- continue;
- }
- }
- } else {
- final ArrayNode subjectsList = (ArrayNode) subjectsJson;
- for (JsonNode subject: subjectsList) {
- try {
- final String searchName = subject.asText();
- completeSchema = postJsonResponse("/subjects/" + searchName, responseJson, "schema id: " + schemaId);
+ completeSchema = postJsonResponse("/subjects/" + searchName, schemaJson, "schema id: " + schemaId);
break;
} catch (SchemaNotFoundException e) {
continue;
@@ -158,12 +195,23 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
}
if(completeSchema == null) {
- throw new SchemaNotFoundException("could not get schema with id: " + schemaId);
+ throw new SchemaNotFoundException("Could not get schema with id: " + schemaId);
}
return createRecordSchema(completeSchema);
}
+ private RecordSchema createRecordSchema(final String name, final int version, final int id, final String schema) throws SchemaNotFoundException {
+ try {
+ final Schema avroSchema = new Schema.Parser().parse(schema);
+ final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(name).id((long) id).version(version).build();
+ return AvroTypeUtil.createSchema(avroSchema, schema, schemaId);
+ } catch (final SchemaParseException spe) {
+ throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + name
+ + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
+ }
+ }
+
private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText();
final int version = schemaNode.get(VERSION_FIELD_NAME).asInt();
@@ -173,7 +221,6 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();
-
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
@@ -196,6 +243,9 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
+
+ logger.debug("POST JSON response URL {}", url);
+
final WebTarget webTarget = client.target(url);
Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE);
for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
@@ -204,26 +254,40 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final Response response = builder.post(Entity.json(schema.toString()));
final int responseCode = response.getStatus();
- if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
- continue;
- }
+ switch (Response.Status.fromStatusCode(responseCode)) {
+ case OK:
+ JsonNode jsonResponse = response.readEntity(JsonNode.class);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("JSON Response: {}", jsonResponse);
+ }
- if(responseCode == Response.Status.OK.getStatusCode()) {
- return response.readEntity(JsonNode.class);
+ return jsonResponse;
+
+ case NOT_FOUND:
+ logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl);
+ continue;
+
+ default:
+ errorMessage = response.readEntity(String.class);
+ continue;
}
}
- throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: "
+ throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription
+ + " from any of the Confluent Schema Registry URL's provided; failure response message: "
+ errorMessage);
}
- private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException {
+ private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException {
String errorMessage = null;
for (final String baseUrl : baseUrls) {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
+ logger.debug("GET JSON response URL {}", url);
+
final WebTarget webTarget = client.target(url);
Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON);
for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
@@ -232,20 +296,28 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final Response response = builder.get();
final int responseCode = response.getStatus();
- if (responseCode == Response.Status.OK.getStatusCode()) {
- return response.readEntity(JsonNode.class);
- }
+ switch (Response.Status.fromStatusCode(responseCode)) {
+ case OK:
+ JsonNode jsonResponse = response.readEntity(JsonNode.class);
- if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
- throw new SchemaNotFoundException("Could not find Schema with " + schemaDescription + " from the Confluent Schema Registry located at " + baseUrl);
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("JSON Response {}", jsonResponse);
+ }
+
+ return jsonResponse;
- if (errorMessage == null) {
- errorMessage = response.readEntity(String.class);
+ case NOT_FOUND:
+ logger.debug("Could not find Schema {} from Registry {}", schemaDescription, baseUrl);
+ continue;
+
+ default:
+ errorMessage = response.readEntity(String.class);
+ continue;
}
}
- throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
+ throw new SchemaNotFoundException("Failed to retrieve Schema with " + schemaDescription
+ + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage);
}
private String getTrimmedBase(String baseUrl) {