You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/19 06:10:07 UTC
[5/6] nifi git commit: NIFI-3857: This closes #1825. Added
PartitionRecord processor
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
index 8f64510..af29da5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
@@ -25,35 +25,40 @@
<p>
The IPLookupService is powered by a MaxMind database and can return several different types of enrichment information
about a given IP address. Below is the schema of the Record that is returned by this service (in Avro Schema format).
+ The schema is for a single record that consists of several fields: <code>geo</code>, <code>isp</code>,
+ <code>domainName</code>, <code>connectionType</code>, and <code>anonymousIp</code>. Each of these fields is nullable
+ and will be populated only if the IP address that is searched for has the relevant information in the MaxMind database
+ and if the Controller Service is configured to return such information. Because each of the fields requires a separate
+ lookup in the database, it is advisable to retrieve only those fields that are of value.
</p>
-
+
<code>
<pre>
{
- "name": "ipEnrichment",
+ "name": "enrichmentRecord",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "geo",
- "type": {
+ "type": ["null", {
"name": "cityGeo",
"type": "record",
"fields": [
- { "name": "city", "type": "string" },
- { "name": "accuracy", "type": "int", "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
- { "name": "metroCode", "type": "int" },
- { "name": "timeZone", "type": "string" },
- { "name": "latitude", "type": "double" },
- { "name": "longitude", "type": "double" },
- { "name": "country", "type": {
+ { "name": "city", "type": ["null", "string"] },
+ { "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
+ { "name": "metroCode", "type": ["null", "int"] },
+ { "name": "timeZone", "type": ["null", "string"] },
+ { "name": "latitude", "type": ["null", "double"] },
+ { "name": "longitude", "type": ["null", "double"] },
+ { "name": "country", "type": ["null", {
"type": "record",
"name": "country",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "isoCode", "type": "string" }
]
- } },
+ }] },
{ "name": "subdivisions", "type": {
"type": "array",
"items": {
@@ -66,37 +71,109 @@
}
}
},
- { "name": "continent", "type": "string" },
- { "name": "postalCode", "type": "string" }
+ { "name": "continent", "type": ["null", "string"] },
+ { "name": "postalCode", "type": ["null", "string"] }
]
- }
+ }]
},
{
"name": "isp",
- "type": {
- "name": "ispEnrich",
+ "type": ["null", {
+ "name": "ispEnrich",
"type": "record",
"fields": [
- { "name": "name", "type": "string" },
- { "name": "organization", "type": "string" },
- { "name": "asn", "type": "int" },
- { "name": "asnOrganization", "type": "string" }
+ { "name": "name", "type": ["null", "string"] },
+ { "name": "organization", "type": ["null", "string"] },
+ { "name": "asn", "type": ["null", "int"] },
+ { "name": "asnOrganization", "type": ["null", "string"] }
]
- }
+ }]
},
{
"name": "domainName",
- "type": "string"
+ "type": ["null", "string"]
},
{
"name": "connectionType",
- "type": "string",
+ "type": ["null", "string"],
"doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
+ },
+ {
+ "name": "anonymousIp",
+ "type": ["null", {
+ "name": "anonymousIpType",
+ "type": "record",
+ "fields": [
+ { "name": "anonymous", "type": "boolean" },
+ { "name": "anonymousVpn", "type": "boolean" },
+ { "name": "hostingProvider", "type": "boolean" },
+ { "name": "publicProxy", "type": "boolean" },
+ { "name": "torExitNode", "type": "boolean" }
+ ]
+ }]
}
]
}
</pre>
</code>
- </body>
+
+ <p>
+ While this schema is fairly complex, it is a single record with 5 fields. This makes it quite easy to update
+ an existing schema to allow for this record, by adding a new field to an existing schema and pasting in the schema
+ above as the type.
+ </p>
+
+ <p>
+ For example, suppose that we have an existing schema that is as simple as:
+ </p>
+
+<pre>
+<code>
+<span style="color: #808080;">
+{
+ "name": "ipRecord",
+ "namespace": "nifi",
+ "type": "record",
+ "fields": [
+ { "name": "ip", "type": "string" }
+ ]
+}
+</span>
+</code>
+</pre>
+
+ <p>
+ Now, let's suppose that we want to add a new field named <code>enrichment</code> to the above schema.
+ Further, let's say that we want the new <code>enrichment</code> field to be nullable.
+ We can do so by copying and pasting our enrichment schema from above thus:
+ </p>
+
+<pre>
+<code>
+<span style="color: #808080;">
+{
+ "name": "ipRecord",
+ "namespace": "nifi",
+ "type": "record",
+ "fields": [
+ { "name": "ip", "type": "string" },</span><span style="color: #191970;">
+ { "name": "enrichment", "type": ["null",
+</span>
+
+ <span style="color: #000000"><Paste Enrichment Schema Here></span>
+
+<span style="color: #191970;">
+ ]</span><span style="color: #808080;">
+ }
+ ]
+}
+</span>
+</code>
+</pre>
+
+
+
+
+ </body>
</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index a3f9cb8..c1f000b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -30,7 +30,6 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -68,10 +67,10 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
}
@Override
- public WriteResult write(final Record record) throws IOException {
+ public Map<String, String> writeRecord(final Record record) throws IOException {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema);
datumWriter.write(rec, encoder);
- return WriteResult.of(1, schemaAccessWriter.getAttributes(recordSchema));
+ return schemaAccessWriter.getAttributes(recordSchema);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
index 9bfb4cf..dd15118 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
@@ -20,13 +20,13 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
@@ -49,10 +49,10 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
}
@Override
- public WriteResult write(final Record record) throws IOException {
+ public Map<String, String> writeRecord(final Record record) throws IOException {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
dataFileWriter.append(rec);
- return WriteResult.of(1, Collections.emptyMap());
+ return Collections.emptyMap();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 8475a50..f8998f9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -27,7 +27,6 @@ import org.apache.commons.csv.CSVPrinter;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -90,14 +89,14 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
}
@Override
- public WriteResult write(final Record record) throws IOException {
+ public Map<String, String> writeRecord(final Record record) throws IOException {
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
}
printer.printRecord(fieldValues);
- return WriteResult.of(1, schemaWriter.getAttributes(recordSchema));
+ return schemaWriter.getAttributes(recordSchema);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index b73ecab..a41412f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -28,7 +28,6 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -97,9 +96,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
}
@Override
- public WriteResult write(final Record record) throws IOException {
+ public Map<String, String> writeRecord(final Record record) throws IOException {
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
- return WriteResult.of(1, schemaAccess.getAttributes(recordSchema));
+ return schemaAccess.getAttributes(recordSchema);
}
private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask)
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
index 95f2a73..7012504 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
@@ -29,7 +29,6 @@ import java.util.Map;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -60,9 +59,9 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
}
@Override
- public WriteResult write(final Record record) throws IOException {
+ public Map<String, String> writeRecord(final Record record) throws IOException {
write(record, out, getColumnNames(record.getSchema()));
- return WriteResult.of(1, Collections.emptyMap());
+ return Collections.emptyMap();
}
private void write(final Record record, final OutputStream out, final List<String> columnNames) throws IOException {