You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/03 02:03:50 UTC
nifi git commit: NIFI-5059 Updated MongoDBLookupService to be able to
detect record schemas or take one provided by the user.
Repository: nifi
Updated Branches:
refs/heads/master d4d4ddade -> 22ec069ac
NIFI-5059 Updated MongoDBLookupService to be able to detect record schemas or take one provided by the user.
NIFI-5059 Changed it to use a schema registry.
NIFI-5059 Updated MongoDBLookupService to be a SchemaRegistryService.
NIFI-5059 Added two changes from a code review.
NIFI-5059 Fixed two bad references.
NIFI-5059 Refactored schema strategy handling.
NIFI-5059 Moved schema strategy handling to JsonInferenceSchemaRegistryService.
NIFI-5059 Updated to use new LookupService method.
NIFI-5059 fixed schema inference bug.
NIFI-5059 Added test for schema text strategy
NIFI-5059 incremented version number to make the build work.
NIFI-5059 fixed a stray 1.7.0 reference.
NIFI-5059 Added getDatabase to client service.
NIFI-5059 Added changes requested in a code review.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2619
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/22ec069a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/22ec069a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/22ec069a
Branch: refs/heads/master
Commit: 22ec069acb39d59388d4adc4965929cfdba8f36b
Parents: d4d4dda
Author: Mike Thomsen <mi...@gmail.com>
Authored: Mon Apr 9 07:28:40 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Jul 2 21:57:50 2018 -0400
----------------------------------------------------------------------
.../nifi-avro-record-utils/pom.xml | 6 +
.../schema/access/InferenceSchemaStrategy.java | 87 +++++++++++
.../nifi/schema/access/SchemaAccessUtils.java | 2 +-
.../JsonInferenceSchemaRegistryService.java | 108 ++++++++++++++
.../serialization/SchemaRegistryService.java | 16 +-
.../nifi/serialization/FakeProcessor.groovy | 44 ++++++
...estJsonInferenceSchemaRegistryService.groovy | 59 ++++++++
.../schema/access/JsonSchemaAccessStrategy.java | 36 +++++
.../nifi/mongodb/MongoDBClientService.java | 4 +
.../nifi-mongodb-services/pom.xml | 12 ++
.../nifi/mongodb/MongoDBControllerService.java | 16 ++
.../nifi/mongodb/MongoDBLookupService.java | 132 ++++++++++++-----
.../nifi/mongodb/MongoDBLookupServiceIT.java | 148 ++++++++++++++++---
.../apache/nifi/mongodb/StubSchemaRegistry.java | 49 ++++++
.../src/test/resources/simple.avsc | 7 +
15 files changed, 663 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
index 864581e..05e62f4 100755
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
@@ -43,6 +43,12 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.8.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
new file mode 100644
index 0000000..6cfd35d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.codehaus.jackson.map.ObjectMapper;
+import sun.misc.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InferenceSchemaStrategy implements JsonSchemaAccessStrategy {
+ private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
+
+ @Override
+ public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+ byte[] bytes = IOUtils.readFully(contentStream, -1, true);
+ ObjectMapper mapper = new ObjectMapper();
+
+ return convertSchema(mapper.readValue(bytes, Map.class));
+ }
+
+ protected RecordSchema convertSchema(Map<String, Object> result) {
+ List<RecordField> fields = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : result.entrySet()) {
+
+ RecordField field;
+ if (entry.getValue() instanceof Integer) {
+ field = new RecordField(entry.getKey(), RecordFieldType.INT.getDataType());
+ } else if (entry.getValue() instanceof Long) {
+ field = new RecordField(entry.getKey(), RecordFieldType.LONG.getDataType());
+ } else if (entry.getValue() instanceof Boolean) {
+ field = new RecordField(entry.getKey(), RecordFieldType.BOOLEAN.getDataType());
+ } else if (entry.getValue() instanceof Double) {
+ field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType());
+ } else if (entry.getValue() instanceof Date) {
+ field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType());
+ } else if (entry.getValue() instanceof List) {
+ field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType());
+ } else if (entry.getValue() instanceof Map) {
+ RecordSchema nestedSchema = convertSchema((Map)entry.getValue());
+ RecordDataType rdt = new RecordDataType(nestedSchema);
+ field = new RecordField(entry.getKey(), rdt);
+ } else {
+ field = new RecordField(entry.getKey(), RecordFieldType.STRING.getDataType());
+ }
+ fields.add(field);
+ }
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Set<SchemaField> getSuppliedSchemaFields() {
+ return schemaFields;
+ }
+
+ @Override
+ public RecordSchema getSchema(Map<String, String> variables, Map<String, Object> content, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+ return convertSchema(content);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
index 82ea240..7921dff 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -50,7 +50,7 @@ public class SchemaAccessUtils {
"The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single "
+ "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. "
+ "This is based on version 3.2.x of the Confluent Schema Registry.");
-
+ public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from Result");
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java
new file mode 100644
index 0000000..b3819cf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.schema.access.AvroSchemaTextStrategy;
+import org.apache.nifi.schema.access.InferenceSchemaStrategy;
+import org.apache.nifi.schema.access.JsonSchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNamePropertyStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.INFER_SCHEMA;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
+
+public class JsonInferenceSchemaRegistryService extends SchemaRegistryService {
+ private String schemaAccess;
+
+ @OnEnabled
+ public void onEnabled(ConfigurationContext context) {
+ this.storeSchemaAccessStrategy(context);
+ this.schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+ }
+
+ @Override
+ protected AllowableValue getDefaultSchemaAccessStrategy() {
+ return INFER_SCHEMA;
+ }
+
+ @Override
+ protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
+ if (strategy == null) {
+ return null;
+ }
+
+ if (strategy.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+ final PropertyValue schemaName = context.getProperty(SCHEMA_NAME);
+ final PropertyValue schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
+ final PropertyValue schemaVersion = context.getProperty(SCHEMA_VERSION);
+ return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, schemaBranchName, schemaVersion);
+ } else if (strategy.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+ return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
+ } else if (strategy.equalsIgnoreCase(INFER_SCHEMA.getValue())) {
+ return new InferenceSchemaStrategy();
+ }
+
+ return null;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>(2);
+
+ final AllowableValue[] strategies = new AllowableValue[] {
+ SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, INFER_SCHEMA
+ };
+
+ properties.add(buildStrategyProperty(strategies));
+
+ properties.add(SCHEMA_REGISTRY);
+ properties.add(SCHEMA_NAME);
+ properties.add(SCHEMA_VERSION);
+ properties.add(SCHEMA_BRANCH_NAME);
+ properties.add(SCHEMA_TEXT);
+
+ return properties;
+ }
+
+ public RecordSchema getSchema(Map<String, String> variables, Map<String, Object> content, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+ if (schemaAccess.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue()) || schemaAccess.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+ return getSchema(variables, readSchema);
+ } else {
+ return ((JsonSchemaAccessStrategy)schemaAccessStrategy).getSchema(variables, content, readSchema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index b299191..6923f48 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -58,7 +58,7 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_
public abstract class SchemaRegistryService extends AbstractControllerService {
private volatile ConfigurationContext configurationContext;
- private volatile SchemaAccessStrategy schemaAccessStrategy;
+ protected volatile SchemaAccessStrategy schemaAccessStrategy;
private static InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
@@ -68,16 +68,20 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
}
+ protected PropertyDescriptor buildStrategyProperty(AllowableValue[] values) {
+ return new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
+ .allowableValues(values)
+ .defaultValue(getDefaultSchemaAccessStrategy().getValue())
+ .build();
+ }
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(2);
final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
- properties.add(new PropertyDescriptor.Builder()
- .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
- .allowableValues(strategies)
- .defaultValue(getDefaultSchemaAccessStrategy().getValue())
- .build());
+ properties.add(buildStrategyProperty(strategies));
properties.add(SCHEMA_REGISTRY);
properties.add(SCHEMA_NAME);
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy
new file mode 100644
index 0000000..6cf32b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization
+
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.processor.AbstractProcessor
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSession
+import org.apache.nifi.processor.exception.ProcessException
+
+class FakeProcessor extends AbstractProcessor {
+ static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ .name("Schema Service")
+ .description("")
+ .identifiesControllerService(JsonInferenceSchemaRegistryService.class)
+ .required(true)
+ .build()
+
+ @Override
+ void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return [
+ CLIENT_SERVICE
+ ]
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
new file mode 100644
index 0000000..c930452
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization
+
+import org.apache.nifi.schema.access.SchemaAccessUtils
+import org.apache.nifi.serialization.record.type.RecordDataType
+import org.apache.nifi.util.TestRunners
+import org.junit.Assert
+import org.junit.Test
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY
+
+class TestJsonInferenceSchemaRegistryService {
+ @Test
+ void testInfer() {
+ def runner = TestRunners.newTestRunner(FakeProcessor.class)
+ def service = new JsonInferenceSchemaRegistryService()
+ runner.addControllerService("schemaService", service)
+ runner.setProperty(FakeProcessor.CLIENT_SERVICE, "schemaService")
+ runner.setProperty(service, service.getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()), SchemaAccessUtils.INFER_SCHEMA)
+ runner.enableControllerService(service)
+ runner.assertValid()
+
+ def json = [
+ name: "John Smith",
+ age: 35,
+ contact: [
+ email: "john.smith@example.com",
+ phone: "123-456-7890"
+ ]
+ ]
+
+ def schema = service.getSchema([:], json, null)
+
+ Assert.assertNotNull(schema)
+ def name = schema.getField("name")
+ def age = schema.getField("age")
+ def contact = schema.getField("contact")
+ Assert.assertTrue(name.isPresent())
+ Assert.assertTrue(age.isPresent())
+ Assert.assertTrue(contact.isPresent())
+ Assert.assertTrue(contact.get().dataType instanceof RecordDataType)
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java
new file mode 100644
index 0000000..30dcfe5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface JsonSchemaAccessStrategy extends SchemaAccessStrategy {
+ /**
+ * Get a schema using a Map object instead of an input stream. This is meant to be used with JSON toolkits.
+ *
+ * @param variables Variables which is used to resolve Record Schema via Expression Language.
+ * This can be null or empty.
+ * @param content JSON content in a Map object form.
+ * @param readSchema The schema that was read from the input content, or <code>null</code> if there was none.
+ * @return The RecordSchema if found.
+ */
+ RecordSchema getSchema(Map<String, String> variables, Map<String, Object> content, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
index b0f1618..5a3a4b2 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
@@ -17,6 +17,7 @@
package org.apache.nifi.mongodb;
+import com.mongodb.client.MongoDatabase;
import org.apache.nifi.controller.ControllerService;
import org.bson.Document;
@@ -31,6 +32,7 @@ public interface MongoDBClientService extends ControllerService {
void delete(Document query);
boolean exists(Document query);
Document findOne(Document query);
+ Document findOne(Document query, Document projection);
List<Document> findMany(Document query);
List<Document> findMany(Document query, int limit);
List<Document> findMany(Document query, Document sort, int limit);
@@ -42,4 +44,6 @@ public interface MongoDBClientService extends ControllerService {
void upsert(Document query, Document update);
void dropDatabase();
void dropCollection();
+
+ MongoDatabase getDatabase(String name);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
index c595508..4b54073 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml
@@ -78,6 +78,17 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.8.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
@@ -88,6 +99,7 @@
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
+ <exclude>src/test/resources/simple.avsc</exclude>
<exclude>src/test/resources/test.csv</exclude>
<exclude>src/test/resources/test.properties</exclude>
<exclude>src/test/resources/test.xml</exclude>
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
index 0faed0d..a1f9b2b 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
@@ -80,6 +80,17 @@ public class MongoDBControllerService extends AbstractMongoDBControllerService i
}
@Override
+ public Document findOne(Document query, Document projection) {
+ MongoCursor<Document> cursor = projection != null
+ ? this.col.find(query).projection(projection).limit(1).iterator()
+ : this.col.find(query).limit(1).iterator();
+ Document retVal = cursor.tryNext();
+ cursor.close();
+
+ return retVal;
+ }
+
+ @Override
public List<Document> findMany(Document query) {
return findMany(query, null, -1);
}
@@ -153,4 +164,9 @@ public class MongoDBControllerService extends AbstractMongoDBControllerService i
this.col.drop();
this.col = null;
}
+
+ @Override
+ public MongoDatabase getDatabase(String name) {
+ return mongoClient.getDatabase(name);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
index fba2287..6c4905a 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
@@ -20,22 +20,20 @@ package org.apache.nifi.mongodb;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import org.bson.Document;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -43,6 +41,17 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.INFER_SCHEMA;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
@Tags({"mongo", "mongodb", "lookup", "record"})
@CapabilityDescription(
@@ -52,31 +61,49 @@ import java.util.Set;
"The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " +
"then the entire MongoDB result document minus the _id field will be returned as a record."
)
-public class MongoDBLookupService extends MongoDBControllerService implements LookupService<Object> {
+public class MongoDBLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Object> {
+ public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
+ .name("mongo-lookup-client-service")
+ .displayName("Client Service")
+ .description("A MongoDB controller service to use with this lookup service.")
+ .required(true)
+ .identifiesControllerService(MongoDBClientService.class)
+ .build();
public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder()
- .name("mongo-lookup-value-field")
- .displayName("Lookup Value Field")
- .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " +
- "MongoDB result document minus the _id field will be returned as a record.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
+ .name("mongo-lookup-value-field")
+ .displayName("Lookup Value Field")
+ .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " +
+ "MongoDB result document minus the _id field will be returned as a record.")
+ .addValidator(Validator.VALID)
+ .required(false)
+ .build();
+ public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
+ .name("mongo-lookup-projection")
+ .displayName("Projection")
+ .description("Specifies a projection for limiting which fields will be returned.")
+ .required(false)
+ .build();
private String lookupValueField;
- private static final List<PropertyDescriptor> lookupDescriptors;
-
- static {
- lookupDescriptors = new ArrayList<>();
- lookupDescriptors.addAll(descriptors);
- lookupDescriptors.add(LOOKUP_VALUE_FIELD);
+ @Override
+ public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+ /*
+ * Unless the user hard-coded schema.name or schema.text into the schema access options, this is going
+ * to force schema detection.
+ */
+ return lookup(coordinates, new HashMap<>());
}
@Override
- public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
- Map<String, Object> clean = new HashMap<>();
- clean.putAll(coordinates);
+ public Optional<Object> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+ Map<String, Object> clean = coordinates.entrySet().stream()
+ .filter(e -> !schemaNameProperty.equals(String.format("${%s}", e.getKey())))
+ .collect(Collectors.toMap(
+ e -> e.getKey(),
+ e -> e.getValue()
+ ));
Document query = new Document(clean);
if (coordinates.size() == 0) {
@@ -84,23 +111,15 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
}
try {
- Document result = this.findOne(query);
+ Document result = projection != null ? controllerService.findOne(query, projection) : controllerService.findOne(query);
if(result == null) {
return Optional.empty();
} else if (!StringUtils.isEmpty(lookupValueField)) {
return Optional.ofNullable(result.get(lookupValueField));
} else {
- final List<RecordField> fields = new ArrayList<>();
+ RecordSchema schema = loadSchema(context, result);
- for (String key : result.keySet()) {
- if (key.equals("_id")) {
- continue;
- }
- fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
- }
-
- final RecordSchema schema = new SimpleRecordSchema(fields);
return Optional.ofNullable(new MapRecord(schema, result));
}
} catch (Exception ex) {
@@ -109,10 +128,32 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
}
}
- @Override
+ private RecordSchema loadSchema(Map<String, String> context, Document doc) {
+ try {
+ return getSchema(context, doc, null);
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
+ private volatile Document projection;
+ private MongoDBClientService controllerService;
+ private String schemaNameProperty;
+
@OnEnabled
- public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+ public void onEnabled(final ConfigurationContext context) {
this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue();
+ this.controllerService = context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBClientService.class);
+
+ this.schemaNameProperty = context.getProperty(SchemaAccessUtils.SCHEMA_NAME).getValue();
+
+ String configuredProjection = context.getProperty(PROJECTION).isSet()
+ ? context.getProperty(PROJECTION).getValue()
+ : null;
+ if (!StringUtils.isBlank(configuredProjection)) {
+ projection = Document.parse(configuredProjection);
+ }
+
super.onEnabled(context);
}
@@ -128,6 +169,25 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return lookupDescriptors;
+ AllowableValue[] strategies = new AllowableValue[] {
+ SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, INFER_SCHEMA
+ };
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.add(new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
+ .allowableValues(strategies)
+ .defaultValue(getDefaultSchemaAccessStrategy().getValue())
+ .build());
+
+ _temp.add(SCHEMA_REGISTRY);
+ _temp.add(SCHEMA_NAME);
+ _temp.add(SCHEMA_VERSION);
+ _temp.add(SCHEMA_BRANCH_NAME);
+ _temp.add(SCHEMA_TEXT);
+ _temp.add(CONTROLLER_SERVICE);
+ _temp.add(LOOKUP_VALUE_FIELD);
+ _temp.add(PROJECTION);
+
+ return Collections.unmodifiableList(_temp);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
index dcb3fb1..2c7f522 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
@@ -17,8 +17,14 @@
package org.apache.nifi.mongodb;
+import org.apache.commons.io.IOUtils;
import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
@@ -27,8 +33,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.sql.Timestamp;
+import java.util.Arrays;
import java.util.Calendar;
+import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -38,36 +48,49 @@ public class MongoDBLookupServiceIT {
private TestRunner runner;
private MongoDBLookupService service;
+ private MongoDBControllerService controllerService;
@Before
public void before() throws Exception {
runner = TestRunners.newTestRunner(TestLookupServiceProcessor.class);
service = new MongoDBLookupService();
+ controllerService = new MongoDBControllerService();
runner.addControllerService("Client Service", service);
- runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME);
- runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME);
- runner.setProperty(service, MongoDBLookupService.URI, "mongodb://localhost:27017");
+ runner.addControllerService("Client Service 2", controllerService);
+ runner.setProperty(TestLookupServiceProcessor.CLIENT_SERVICE, "Client Service");
+ runner.setProperty(controllerService, MongoDBControllerService.DATABASE_NAME, DB_NAME);
+ runner.setProperty(controllerService, MongoDBControllerService.COLLECTION_NAME, COL_NAME);
+ runner.setProperty(controllerService, MongoDBControllerService.URI, "mongodb://localhost:27017");
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
+ runner.setProperty(service, MongoDBLookupService.CONTROLLER_SERVICE, "Client Service 2");
+ SchemaRegistry registry = new StubSchemaRegistry();
+ runner.addControllerService("registry", registry);
+ runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
+ runner.setProperty(service, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+ runner.enableControllerService(registry);
+ runner.enableControllerService(controllerService);
+ runner.enableControllerService(service);
}
@After
- public void after() throws Exception {
- service.dropDatabase();
- service.onDisable();
+ public void after() {
+ controllerService.dropDatabase();
+ controllerService.onDisable();
}
@Test
- public void testInit() throws Exception {
- runner.enableControllerService(service);
+ public void testInit() {
runner.assertValid(service);
+
}
@Test
public void testLookupSingle() throws Exception {
+ runner.disableControllerService(service);
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
runner.enableControllerService(service);
- Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
- service.insert(document);
+ Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
+ controllerService.insert(document);
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
@@ -78,7 +101,7 @@ public class MongoDBLookupServiceIT {
Map<String, Object> clean = new HashMap<>();
clean.putAll(criteria);
- service.delete(new Document(clean));
+ controllerService.delete(new Document(clean));
try {
result = service.lookup(criteria);
@@ -90,11 +113,85 @@ public class MongoDBLookupServiceIT {
}
@Test
+ public void testWithSchemaRegistry() throws Exception {
+ runner.assertValid();
+
+ controllerService.insert(new Document()
+ .append("username", "john.smith")
+ .append("password", "testing1234")
+ );
+
+ Map<String, Object> criteria = new HashMap<>();
+ criteria.put("username", "john.smith");
+ Map<String, String> context = new HashMap<>();
+ context.put("schema.name", "user");
+ Optional result = service.lookup(criteria, context);
+ Assert.assertTrue(result.isPresent());
+ Assert.assertNotNull(result.get());
+ MapRecord record = (MapRecord)result.get();
+
+ Assert.assertEquals("john.smith", record.getAsString("username"));
+ Assert.assertEquals("testing1234", record.getAsString("password"));
+
+ /*
+ * Test falling back on schema detection if a user doesn't specify the context argument
+ */
+ result = service.lookup(criteria);
+ Assert.assertTrue(result.isPresent());
+ Assert.assertNotNull(result.get());
+ record = (MapRecord)result.get();
+
+ Assert.assertEquals("john.smith", record.getAsString("username"));
+ Assert.assertEquals("testing1234", record.getAsString("password"));
+ }
+
+ @Test
+ public void testSchemaTextStrategy() throws Exception {
+ byte[] contents = IOUtils.toByteArray(getClass().getResourceAsStream("/simple.avsc"));
+
+ runner.disableControllerService(service);
+ runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
+ runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }");
+ runner.setProperty(service, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(service, SchemaAccessUtils.SCHEMA_TEXT, "${schema.text}");
+ runner.enableControllerService(service);
+ runner.assertValid();
+
+ controllerService.insert(new Document().append("msg", "Testing1234"));
+
+ Map<String, Object> criteria = new HashMap<>();
+ criteria.put("msg", "Testing1234");
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("schema.text", new String(contents));
+
+ Optional results = service.lookup(criteria, attrs);
+ Assert.assertNotNull(results);
+ Assert.assertTrue(results.isPresent());
+ }
+
+ @Test
public void testLookupRecord() throws Exception {
+ runner.disableControllerService(service);
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
+ runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }");
runner.enableControllerService(service);
- Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
- service.insert(document);
+
+ Date d = new Date();
+ Timestamp ts = new Timestamp(new Date().getTime());
+ List list = Arrays.asList("a", "b", "c", "d", "e");
+
+ controllerService.insert(new Document()
+ .append("uuid", "x-y-z")
+ .append("dateField", d)
+ .append("longField", 10000L)
+ .append("stringField", "Hello, world")
+ .append("timestampField", ts)
+ .append("decimalField", Double.MAX_VALUE / 2.0)
+ .append("subrecordField", new Document()
+ .append("nestedString", "test")
+ .append("nestedLong", new Long(1000)))
+ .append("arrayField", list)
+ );
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
@@ -103,12 +200,24 @@ public class MongoDBLookupServiceIT {
Assert.assertNotNull("The value was null.", result.get());
Assert.assertTrue("The value was wrong.", result.get() instanceof MapRecord);
MapRecord record = (MapRecord)result.get();
- Assert.assertEquals("The value was wrong.", "Hello, world", record.getAsString("message"));
- Assert.assertEquals("The value was wrong.", "x-y-z", record.getAsString("uuid"));
+ RecordSchema subSchema = ((RecordDataType)record.getSchema().getField("subrecordField").get().getDataType()).getChildSchema();
+
+ Assert.assertEquals("The value was wrong.", "Hello, world", record.getValue("stringField"));
+ Assert.assertEquals("The value was wrong.", "x-y-z", record.getValue("uuid"));
+ Assert.assertEquals(new Long(10000), record.getValue("longField"));
+ Assert.assertEquals((Double.MAX_VALUE / 2.0), record.getValue("decimalField"));
+ Assert.assertEquals(d, record.getValue("dateField"));
+ Assert.assertEquals(ts.getTime(), ((Date)record.getValue("timestampField")).getTime());
+
+ Record subRecord = record.getAsRecord("subrecordField", subSchema);
+ Assert.assertNotNull(subRecord);
+ Assert.assertEquals("test", subRecord.getValue("nestedString"));
+ Assert.assertEquals(new Long(1000), subRecord.getValue("nestedLong"));
+ Assert.assertEquals(list, record.getValue("arrayField"));
Map<String, Object> clean = new HashMap<>();
clean.putAll(criteria);
- service.delete(new Document(clean));
+ controllerService.delete(new Document(clean));
try {
result = service.lookup(criteria);
@@ -120,10 +229,9 @@ public class MongoDBLookupServiceIT {
}
@Test
- public void testServiceParameters() throws Exception {
- runner.enableControllerService(service);
- Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
- service.insert(document);
+ public void testServiceParameters() {
+ Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
+ controllerService.insert(document);
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java
new file mode 100644
index 0000000..5e17258
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.mongodb;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class StubSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
+ @Override
+ public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) {
+ List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("username", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("password", RecordFieldType.STRING.getDataType()));
+ return new SimpleRecordSchema(fields);
+ }
+
+ @Override
+ public Set<SchemaField> getSuppliedSchemaFields() {
+ return new HashSet<SchemaField>() {{
+ add(SchemaField.SCHEMA_NAME);
+ }};
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/22ec069a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc
new file mode 100644
index 0000000..2bea9cb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc
@@ -0,0 +1,7 @@
+{
+ "type": "record",
+ "name": "SimpleRecord",
+ "fields": [
+ { "name": "msg", "type": "string" }
+ ]
+}
\ No newline at end of file