You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by GitBox <gi...@apache.org> on 2022/05/23 13:33:36 UTC

[GitHub] [unomi] jkevan commented on a diff in pull request #426: UNOMI-571: JSON Schema extensions system

jkevan commented on code in PR #426:
URL: https://github.com/apache/unomi/pull/426#discussion_r879462837


##########
extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java:
##########
@@ -50,86 +48,98 @@ public class SchemaServiceImpl implements SchemaService {
 
     ObjectMapper objectMapper = new ObjectMapper();
 
-    private final Map<String, JsonSchemaWrapper> predefinedUnomiJSONSchemaById = new HashMap<>();
-    private Map<String, JsonSchemaWrapper> schemasById = new HashMap<>();
+    /**
+     *  Schemas provided by Unomi runtime bundles in /META-INF/cxs/schemas/...
+     */
+    private final ConcurrentMap<String, JsonSchemaWrapper> predefinedUnomiJSONSchemaById = new ConcurrentHashMap<>();
+    /**
+     * All Unomi schemas indexed by URI
+     */
+    private final ConcurrentMap<String, JsonSchemaWrapper> schemasById = new ConcurrentHashMap<>();
+    /**
+     * Available extensions indexed by key:schema URI to be extended, value: list of schema extension URIs
+     */
+    private final ConcurrentMap<String, Set<String>> extensions = new ConcurrentHashMap<>();
 
     private Integer jsonSchemaRefreshInterval = 1000;
     private ScheduledFuture<?> scheduledFuture;
 
-    private BundleContext bundleContext;
     private PersistenceService persistenceService;
-    private SchedulerService schedulerService;
     private JsonSchemaFactory jsonSchemaFactory;
 
+    // TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service
+    private ScheduledExecutorService scheduler;
+    //private SchedulerService schedulerService;
 
-    @Override
-    public PartialList<Metadata> getJsonSchemaMetadatas(int offset, int size, String sortBy) {
-        PartialList<JsonSchemaWrapper> items = persistenceService.getAllItems(JsonSchemaWrapper.class, offset, size, sortBy);
-        List<Metadata> details = new LinkedList<>();
-        for (JsonSchemaWrapper definition : items.getList()) {
-            details.add(definition.getMetadata());
-        }
-        return new PartialList<>(details, items.getOffset(), items.getPageSize(), items.getTotalSize(), items.getTotalSizeRelation());
-    }
 
     @Override
     public boolean isValid(String data, String schemaId) {
-        JsonSchema jsonSchema = null;
-        JsonNode jsonNode = null;
+        JsonSchema jsonSchema;
+        JsonNode jsonNode;
 
         try {
             jsonNode = objectMapper.readTree(data);
             jsonSchema = jsonSchemaFactory.getSchema(new URI(schemaId));
         } catch (Exception e) {
-            logger.error("Failed to process data to validate because {} - Set SchemaServiceImpl at DEBUG level for more detail ", e.getMessage());
+            logger.error("Schema validation failed because: {} - Set SchemaServiceImpl at DEBUG level for more detail ", e.getMessage());

Review Comment:
   Thx it have been updated !



##########
extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java:
##########
@@ -50,86 +48,98 @@ public class SchemaServiceImpl implements SchemaService {
 
     ObjectMapper objectMapper = new ObjectMapper();
 
-    private final Map<String, JsonSchemaWrapper> predefinedUnomiJSONSchemaById = new HashMap<>();
-    private Map<String, JsonSchemaWrapper> schemasById = new HashMap<>();
+    /**
+     *  Schemas provided by Unomi runtime bundles in /META-INF/cxs/schemas/...
+     */
+    private final ConcurrentMap<String, JsonSchemaWrapper> predefinedUnomiJSONSchemaById = new ConcurrentHashMap<>();
+    /**
+     * All Unomi schemas indexed by URI
+     */
+    private final ConcurrentMap<String, JsonSchemaWrapper> schemasById = new ConcurrentHashMap<>();
+    /**
+     * Available extensions indexed by key:schema URI to be extended, value: list of schema extension URIs
+     */
+    private final ConcurrentMap<String, Set<String>> extensions = new ConcurrentHashMap<>();
 
     private Integer jsonSchemaRefreshInterval = 1000;
     private ScheduledFuture<?> scheduledFuture;
 
-    private BundleContext bundleContext;
     private PersistenceService persistenceService;
-    private SchedulerService schedulerService;
     private JsonSchemaFactory jsonSchemaFactory;
 
+    // TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service
+    private ScheduledExecutorService scheduler;
+    //private SchedulerService schedulerService;
 
-    @Override
-    public PartialList<Metadata> getJsonSchemaMetadatas(int offset, int size, String sortBy) {
-        PartialList<JsonSchemaWrapper> items = persistenceService.getAllItems(JsonSchemaWrapper.class, offset, size, sortBy);
-        List<Metadata> details = new LinkedList<>();
-        for (JsonSchemaWrapper definition : items.getList()) {
-            details.add(definition.getMetadata());
-        }
-        return new PartialList<>(details, items.getOffset(), items.getPageSize(), items.getTotalSize(), items.getTotalSizeRelation());
-    }
 
     @Override
     public boolean isValid(String data, String schemaId) {
-        JsonSchema jsonSchema = null;
-        JsonNode jsonNode = null;
+        JsonSchema jsonSchema;
+        JsonNode jsonNode;
 
         try {
             jsonNode = objectMapper.readTree(data);
             jsonSchema = jsonSchemaFactory.getSchema(new URI(schemaId));
         } catch (Exception e) {
-            logger.error("Failed to process data to validate because {} - Set SchemaServiceImpl at DEBUG level for more detail ", e.getMessage());
+            logger.error("Schema validation failed because: {} - Set SchemaServiceImpl at DEBUG level for more detail ", e.getMessage());
             logger.debug("full error",e);
             return false;
         }
 
         if (jsonNode == null) {
-            logger.warn("No data to validate");
+            // no data to validate
             return false;
         }
 
         if (jsonSchema == null) {
-            logger.warn("No schema found for {}", schemaId);
+            logger.warn("Schema validation failed because: Schema not found {}", schemaId);
+            return false;
+        }
+
+        Set<ValidationMessage> validationMessages;
+        try {
+            validationMessages = jsonSchema.validate(jsonNode);
+        } catch (Exception e) {
+            logger.error("Schema validation failed because: {} - Set SchemaServiceImpl at DEBUG level for more detail ", e.getMessage());

Review Comment:
   Thx it have been updated !



##########
extensions/json-schema/services/src/main/java/org/apache/unomi/schema/impl/SchemaServiceImpl.java:
##########
@@ -139,103 +149,192 @@ public void saveSchema(String schema) {
     public boolean deleteSchema(String schemaId) {
         // forbidden to delete predefined Unomi schemas
         if (!predefinedUnomiJSONSchemaById.containsKey(schemaId)) {
-            schemasById.remove(schemaId);
+            // remove persisted schema
             return persistenceService.remove(schemaId, JsonSchemaWrapper.class);
         }
         return false;
     }
 
     @Override
     public void loadPredefinedSchema(InputStream schemaStream) throws IOException {
-        String jsonSchema = IOUtils.toString(schemaStream);
-
-        // check that schema is valid and get the id
-        JsonNode schemaNode = jsonSchemaFactory.getSchema(jsonSchema).getSchemaNode();
-        String schemaId = schemaNode.get("$id").asText();
-        String target = schemaNode.at("/self/target").asText();
-        JsonSchemaWrapper jsonSchemaWrapper = new JsonSchemaWrapper(schemaId, jsonSchema, target);
-
-        predefinedUnomiJSONSchemaById.put(schemaId, jsonSchemaWrapper);
-        schemasById.put(schemaId, jsonSchemaWrapper);
+        String schema = IOUtils.toString(schemaStream);
+        JsonSchemaWrapper jsonSchemaWrapper = buildJsonSchemaWrapper(schema);
+        predefinedUnomiJSONSchemaById.put(jsonSchemaWrapper.getItemId(), jsonSchemaWrapper);
     }
 
     @Override
     public boolean unloadPredefinedSchema(InputStream schemaStream) {
         JsonNode schemaNode = jsonSchemaFactory.getSchema(schemaStream).getSchemaNode();
         String schemaId = schemaNode.get("$id").asText();
+        return predefinedUnomiJSONSchemaById.remove(schemaId) != null;
+    }
+
+    private JsonSchemaWrapper buildJsonSchemaWrapper(String schema) {
+        JsonSchema jsonSchema = jsonSchemaFactory.getSchema(schema);
+        JsonNode schemaNode = jsonSchema.getSchemaNode();
+
+        String schemaId = schemaNode.get("$id").asText();
+        String target = schemaNode.at("/self/target").asText();
+        String name = schemaNode.at("/self/name").asText();
+        String extendsSchemaId = schemaNode.at("/self/extends").asText();
 
-        return predefinedUnomiJSONSchemaById.remove(schemaId) != null && schemasById.remove(schemaId) != null;
+        if ("events".equals(target) && !name.matches("[_A-Za-z][_0-9A-Za-z]*")) {
+            throw new IllegalArgumentException(
+                    "The \"/self/name\" value should match the following regular expression [_A-Za-z][_0-9A-Za-z]* for the Json schema on events");
+        }
+
+        return new JsonSchemaWrapper(schemaId, schema, target, extendsSchemaId, new Date());
     }
 
-    @Override
-    public JsonSchemaWrapper getSchema(String schemaId) {
-        return schemasById.get(schemaId);
+    private void refreshJSONSchemas() {
+        // use local variable to avoid concurrency issues.
+        Map<String, JsonSchemaWrapper> schemasByIdReloaded = new HashMap<>();
+        schemasByIdReloaded.putAll(predefinedUnomiJSONSchemaById);
+        schemasByIdReloaded.putAll(persistenceService.getAllItems(JsonSchemaWrapper.class).stream().collect(Collectors.toMap(Item::getItemId, s -> s)));
+
+        // flush cache if size is different (can be new schema or deleted schemas)
+        boolean changes = schemasByIdReloaded.size() != schemasById.size();
+        // check for modifications
+        if (!changes) {
+            for (JsonSchemaWrapper reloadedSchema : schemasByIdReloaded.values()) {
+                JsonSchemaWrapper oldSchema = schemasById.get(reloadedSchema.getItemId());
+                if (oldSchema == null || !oldSchema.getTimeStamp().equals(reloadedSchema.getTimeStamp())) {
+                    changes = true;
+                    break;
+                }
+            }
+        }
+
+        if (changes) {
+            schemasById.clear();
+            schemasById.putAll(schemasByIdReloaded);
+
+            initExtensions(schemasByIdReloaded);
+            initJsonSchemaFactory();
+        }
     }
 
-    private URIFetcher getUriFetcher() {
-        return uri -> {
-            logger.debug("Fetching schema {}", uri);
-            JsonSchemaWrapper jsonSchemaWrapper = schemasById.get(uri.toString());
-            if (jsonSchemaWrapper == null) {
-                logger.error("Couldn't find schema {}", uri);
-                return null;
+    private void initExtensions(Map<String, JsonSchemaWrapper> schemas) {
+        Map<String, Set<String>> extensionsReloaded = new HashMap<>();
+        // lookup extensions
+        List<JsonSchemaWrapper> schemaExtensions = schemas.values()
+                .stream()
+                .filter(jsonSchemaWrapper -> StringUtils.isNotBlank(jsonSchemaWrapper.getExtendsSchemaId()))
+                .collect(Collectors.toList());
+
+        // build new in RAM extensions map
+        for (JsonSchemaWrapper extension : schemaExtensions) {
+            String extendedSchemaId = extension.getExtendsSchemaId();
+            if (!extension.getItemId().equals(extendedSchemaId)) {
+                if (!extensionsReloaded.containsKey(extendedSchemaId)) {
+                    extensionsReloaded.put(extendedSchemaId, new HashSet<>());
+                }
+                extensionsReloaded.get(extendedSchemaId).add(extension.getItemId());
+            } else {
+                logger.warn("A schema cannot extends himself, please fix your schema definition for schema: {}", extendedSchemaId);
             }
-            return IOUtils.toInputStream(jsonSchemaWrapper.getSchema());
-        };
+        }
+
+        extensions.clear();
+        extensions.putAll(extensionsReloaded);
     }
 
-    private void refreshJSONSchemas() {
-        schemasById = new HashMap<>();
-        schemasById.putAll(predefinedUnomiJSONSchemaById);
+    private String checkForExtensions(String id, String schema) throws JsonProcessingException {

Review Comment:
   Thx it have been updated !



##########
itests/src/test/java/org/apache/unomi/itests/ContextServletIT.java:
##########
@@ -136,29 +139,32 @@ public void setUp() throws InterruptedException {
         keepTrying("Profile " + TEST_PROFILE_ID + " not found in the required time", () -> profileService.load(TEST_PROFILE_ID),
                 Objects::nonNull, DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
 
-        keepTrying("Couldn't find json schema endpoint", () -> get(JSONSCHEMA_URL, List.class), Objects::nonNull, DEFAULT_TRYING_TIMEOUT,
-                DEFAULT_TRYING_TRIES);
+        // create schemas required for tests
+        schemaService.saveSchema(resourceAsString(TEST_EVENT_TYPE_SCHEMA));
+        schemaService.saveSchema(resourceAsString(FLOAT_PROPERTY_EVENT_TYPE_SCHEMA));
+        keepTrying("Couldn't find json schemas",
+                () -> schemaService.getInstalledJsonSchemaIds(),
+                (schemaIds) -> (schemaIds.contains("https://unomi.apache.org/schemas/json/events/floatPropertyType/1-0-0") &&
+                        schemaIds.contains("https://unomi.apache.org/schemas/json/events/testEventType/1-0-0")),
+                DEFAULT_TRYING_TIMEOUT, DEFAULT_TRYING_TRIES);
     }
 
     @After
-    public void tearDown() {
+    public void tearDown() throws InterruptedException {
         TestUtils.removeAllEvents(definitionsService, persistenceService);
         TestUtils.removeAllSessions(definitionsService, persistenceService);
         TestUtils.removeAllProfiles(definitionsService, persistenceService);
         profileService.delete(profile.getItemId(), false);
         segmentService.removeSegmentDefinition(SEGMENT_ID, false);
 
-        String encodedString = Base64.getEncoder()
-                .encodeToString("https://unomi.apache.org/schemas/json/events/testEventType/1-0-0".getBytes());
-        delete(JSONSCHEMA_URL + "/" + encodedString);
-
-        encodedString = Base64.getEncoder()
-                .encodeToString("https://unomi.apache.org/schemas/json/events/floatPropertyType/1-0-0".getBytes());
-        delete(JSONSCHEMA_URL + "/" + encodedString);
-
-        encodedString = Base64.getEncoder()
-                .encodeToString("https://unomi.apache.org/schemas/json/events/floatPropertyType/1-0-0".getBytes());
-        delete(JSONSCHEMA_URL + "/" + encodedString);
+        // cleanup schemas
+        schemaService.deleteSchema("https://unomi.apache.org/schemas/json/events/testEventType/1-0-0");
+        schemaService.deleteSchema("https://unomi.apache.org/schemas/json/events/floatPropertyType/1-0-0");
+        keepTrying("Couldn't find json schemas",

Review Comment:
   Thx it have been updated !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@unomi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org