You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/19 06:10:07 UTC

[5/6] nifi git commit: NIFI-3857: This closes #1825. Added PartitionRecord processor

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
index 8f64510..af29da5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
@@ -25,35 +25,40 @@
       <p>
         The IPLookupService is powered by a MaxMind database and can return several different types of enrichment information
         about a given IP address. Below is the schema of the Record that is returned by this service (in Avro Schema format).
+        The schema is for a single record that consists of several fields: <code>geo</code>, <code>isp</code>,
+        <code>domainName</code>, <code>connectionType</code>, and <code>anonymousIp</code>. Each of these fields is nullable
+        and will be populated only if the IP address that is searched for has the relevant information in the MaxMind database
+        and if the Controller Service is configured to return such information. Because each of the fields requires a separate
+        lookup in the database, it is advisable to retrieve only those fields that are of value.
       </p>
-  
+      
 <code>
 <pre>
 {
-  "name": "ipEnrichment",
+  "name": "enrichmentRecord",
   "namespace": "nifi",
   "type": "record",
   "fields": [
     {
       "name": "geo",
-      "type": {
+      "type": ["null", {
         "name": "cityGeo",
         "type": "record",
         "fields": [
-          { "name": "city", "type": "string" },
-          { "name": "accuracy", "type": "int", "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
-          { "name": "metroCode", "type": "int" },
-          { "name": "timeZone", "type": "string" },
-          { "name": "latitude", "type": "double" },
-          { "name": "longitude", "type": "double" },
-          { "name": "country", "type": {
+          { "name": "city", "type": ["null", "string"] },
+          { "name": "accuracy", "type": ["null", "int"], "doc": "The radius, in kilometers, around the given location, where the IP address is believed to be" },
+          { "name": "metroCode", "type": ["null", "int"] },
+          { "name": "timeZone", "type": ["null", "string"] },
+          { "name": "latitude", "type": ["null", "double"] },
+          { "name": "longitude", "type": ["null", "double"] },
+          { "name": "country", "type": ["null", {
             "type": "record",
             "name": "country",
             "fields": [
               { "name": "name", "type": "string" },
               { "name": "isoCode", "type": "string" }
             ]
-          } },
+          }] },
             { "name": "subdivisions", "type": {
               "type": "array",
               "items": {
@@ -66,37 +71,109 @@
               }
             }
           },
-          { "name": "continent", "type": "string" },
-          { "name": "postalCode", "type": "string" }
+          { "name": "continent", "type": ["null", "string"] },
+          { "name": "postalCode", "type": ["null", "string"] }
         ]
-      }
+      }]
     },
     {
       "name": "isp",
-      "type": {
-          "name": "ispEnrich",
+      "type": ["null", {
+        "name": "ispEnrich",
         "type": "record",
         "fields": [
-          { "name": "name", "type": "string" },
-          { "name": "organization", "type": "string" },
-          { "name": "asn", "type": "int" },
-          { "name": "asnOrganization", "type": "string" }
+          { "name": "name", "type": ["null", "string"] },
+          { "name": "organization", "type": ["null", "string"] },
+          { "name": "asn", "type": ["null", "int"] },
+          { "name": "asnOrganization", "type": ["null", "string"] }
         ]
-    }
+      }]
     },
     {
       "name": "domainName",
-      "type": "string"
+      "type": ["null", "string"]
     },
     {
       "name": "connectionType",
-      "type": "string",
+      "type": ["null", "string"],
       "doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
+    },
+    {
+      "name": "anonymousIp",
+      "type": ["null", {
+        "name": "anonymousIpType",
+        "type": "record",
+        "fields": [
+          { "name": "anonymous", "type": "boolean" },
+          { "name": "anonymousVpn", "type": "boolean" },
+          { "name": "hostingProvider", "type": "boolean" },
+          { "name": "publicProxy", "type": "boolean" },
+          { "name": "torExitNode", "type": "boolean" }
+        ]
+      }]
     }
   ]
 }
 </pre>
 </code>
 
-  </body>
+
+  <p>
+    While this schema is fairly complex, it is a single record with 5 fields. This makes it quite easy to update
+    an existing schema to allow for this record, by adding a new field to an existing schema and pasting in the schema
+    above as the type.
+  </p>
+  
+  <p>
+    For example, suppose that we have an existing schema that is as simple as:
+  </p>
+
+<pre>
+<code>
+<span style="color: #808080;">
+{
+  "name": "ipRecord",
+  "namespace": "nifi",
+  "type": "record",
+  "fields": [
+    { "name": "ip", "type": "string" }
+  ]
+}
+</span>
+</code>
+</pre>
+
+  <p>
+    Now, let's suppose that we want to add a new field named <code>enrichment</code> to the above schema.
+    Further, let's say that we want the new <code>enrichment</code> field to be nullable.
+    We can do so by copying and pasting our enrichment schema from above thus:
+  </p>
+
+<pre>
+<code>
+<span style="color: #808080;">
+{
+  "name": "ipRecord",
+  "namespace": "nifi",
+  "type": "record",
+  "fields": [
+    { "name": "ip", "type": "string" },</span><span style="color: #191970;">
+    { "name": "enrichment", "type": ["null",
+</span>
+
+      <span style="color: #000000">&lt;Paste Enrichment Schema Here&gt;</span>
+
+<span style="color: #191970;">
+    ]</span><span style="color: #808080;">
+    }
+  ]
+}
+</span>
+</code>
+</pre>
+
+
+
+
+ </body>
 </html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index a3f9cb8..c1f000b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -30,7 +30,6 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 
@@ -68,10 +67,10 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
     }
 
     @Override
-    public WriteResult write(final Record record) throws IOException {
+    public Map<String, String> writeRecord(final Record record) throws IOException {
         final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema);
         datumWriter.write(rec, encoder);
-        return WriteResult.of(1, schemaAccessWriter.getAttributes(recordSchema));
+        return schemaAccessWriter.getAttributes(recordSchema);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
index 9bfb4cf..dd15118 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
@@ -20,13 +20,13 @@ package org.apache.nifi.avro;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 
 public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
@@ -49,10 +49,10 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
     }
 
     @Override
-    public WriteResult write(final Record record) throws IOException {
+    public Map<String, String> writeRecord(final Record record) throws IOException {
         final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
         dataFileWriter.append(rec);
-        return WriteResult.of(1, Collections.emptyMap());
+        return Collections.emptyMap();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 8475a50..f8998f9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -27,7 +27,6 @@ import org.apache.commons.csv.CSVPrinter;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -90,14 +89,14 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
     }
 
     @Override
-    public WriteResult write(final Record record) throws IOException {
+    public Map<String, String> writeRecord(final Record record) throws IOException {
         int i = 0;
         for (final RecordField recordField : recordSchema.getFields()) {
             fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
         }
 
         printer.printRecord(fieldValues);
-        return WriteResult.of(1, schemaWriter.getAttributes(recordSchema));
+        return schemaWriter.getAttributes(recordSchema);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index b73ecab..a41412f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -28,7 +28,6 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -97,9 +96,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
     }
 
     @Override
-    public WriteResult write(final Record record) throws IOException {
+    public Map<String, String> writeRecord(final Record record) throws IOException {
         writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
-        return WriteResult.of(1, schemaAccess.getAttributes(recordSchema));
+        return schemaAccess.getAttributes(recordSchema);
     }
 
     private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask)

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
index 95f2a73..7012504 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -60,9 +59,9 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
     }
 
     @Override
-    public WriteResult write(final Record record) throws IOException {
+    public Map<String, String> writeRecord(final Record record) throws IOException {
         write(record, out, getColumnNames(record.getSchema()));
-        return WriteResult.of(1, Collections.emptyMap());
+        return Collections.emptyMap();
     }
 
     private void write(final Record record, final OutputStream out, final List<String> columnNames) throws IOException {