You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/25 16:12:22 UTC
[18/19] nifi git commit: NIFI-810: Merged master into branch
NIFI-810: Merged master into branch
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0636f0e7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0636f0e7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0636f0e7
Branch: refs/heads/master
Commit: 0636f0e731cd28299edd3a6e9db90de5045ab662
Parents: 8e2308b d63cd6b
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 25 11:02:40 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:02:40 2015 -0400
----------------------------------------------------------------------
.../src/main/asciidoc/administration-guide.adoc | 4 +-
.../src/main/java/MyProcessor.java | 11 +-
.../nifi/processors/avro/ConvertAvroToJSON.java | 67 ++++-
.../processors/avro/TestConvertAvroToJSON.java | 47 ++-
.../processors/aws/AbstractAWSProcessor.java | 2 +-
.../nifi/processors/aws/s3/DeleteS3Object.java | 98 ++++++
.../org.apache.nifi.processor.Processor | 1 +
.../processors/aws/s3/TestDeleteS3Object.java | 141 +++++++++
.../nifi/controller/FlowUnmarshaller.java | 77 -----
.../src/main/resources/FlowConfiguration.xsd | 2 +-
.../src/main/resources/bin/nifi.sh | 96 +++---
.../canvas/new-controller-service-dialog.jsp | 1 -
.../partials/canvas/new-processor-dialog.jsp | 1 -
.../canvas/new-reporting-task-dialog.jsp | 1 -
.../css/new-controller-service-dialog.css | 9 -
.../main/webapp/css/new-processor-dialog.css | 9 -
.../webapp/css/new-reporting-task-dialog.css | 9 -
.../webapp/js/nf/canvas/nf-canvas-toolbox.js | 60 ++--
.../src/main/webapp/js/nf/canvas/nf-settings.js | 140 +++++----
.../processors/kite/AbstractKiteProcessor.java | 11 +-
.../nifi/processors/kite/ConvertCSVToAvro.java | 296 ++++++++++---------
.../processors/kite/TestCSVToAvroProcessor.java | 39 +++
.../nifi-standard-prioritizers/pom.xml | 4 +
.../PriorityAttributePrioritizer.java | 7 +-
.../PriorityAttributePrioritizerTest.java | 17 +-
.../nifi-standard-processors/pom.xml | 9 +
.../nifi/processors/standard/ExecuteSQL.java | 9 +-
.../nifi/processors/standard/InvokeHTTP.java | 1 +
.../nifi/processors/standard/ListenHTTP.java | 105 ++++---
.../standard/PutDistributedMapCache.java | 252 ++++++++++++++++
.../servlets/ContentAcknowledgmentServlet.java | 3 +-
.../standard/servlets/ListenHTTPServlet.java | 8 +-
.../processors/standard/util/JdbcCommon.java | 70 ++++-
.../org.apache.nifi.processor.Processor | 1 +
.../nifi/processors/standard/TestGetFile.java | 21 +-
.../standard/TestPutDistributedMapCache.java | 277 +++++++++++++++++
.../standard/util/TestJdbcCommon.java | 42 +++
.../standard/util/TestJdbcTypesDerby.java | 133 +++++++++
.../standard/util/TestJdbcTypesH2.java | 149 ++++++++++
pom.xml | 2 +-
40 files changed, 1725 insertions(+), 507 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index b214427,f0ba71a..f0f1630
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@@ -35,7 -36,8 +38,7 @@@ import org.apache.nifi.annotation.behav
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
--import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
@@@ -47,8 -50,7 +51,7 @@@ import org.apache.nifi.processor.io.Str
@SideEffectFree
@SupportsBatching
- @Tags({ "json", "avro", "binary" })
-@Tags({"json", "avro", "binary"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 6f126aa,ea84daa..43b33ff
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@@ -30,8 -30,7 +30,9 @@@ import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
+ import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@@ -68,114 -66,118 +69,108 @@@ public class ConvertCSVToAvro extends A
private static final Validator CHAR_VALIDATOR = new Validator() {
@Override
-- public ValidationResult validate(String subject, String input,
-- ValidationContext context) {
++ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ // Allows special, escaped characters as input, which is then unescaped and converted to a single character.
+ // Examples for special characters: \t (or \u0009), \f.
+ input = unescapeString(input);
+
return new ValidationResult.Builder()
-- .subject(subject)
-- .input(input)
- .explanation("Only single characters are supported")
- .valid(input.length() == 1)
- .explanation("Only non-null single characters are supported")
- .valid(input.length() == 1 && input.charAt(0) != 0)
-- .build();
++ .subject(subject)
++ .input(input)
++ .explanation("Only non-null single characters are supported")
++ .valid(input.length() == 1 && input.charAt(0) != 0)
++ .build();
}
};
private static final Relationship SUCCESS = new Relationship.Builder()
-- .name("success")
-- .description("Avro content that was converted successfully from CSV")
-- .build();
++ .name("success")
++ .description("Avro content that was converted successfully from CSV")
++ .build();
private static final Relationship FAILURE = new Relationship.Builder()
-- .name("failure")
-- .description("CSV content that could not be processed")
-- .build();
++ .name("failure")
++ .description("CSV content that could not be processed")
++ .build();
private static final Relationship INCOMPATIBLE = new Relationship.Builder()
-- .name("incompatible")
-- .description("CSV content that could not be converted")
-- .build();
++ .name("incompatible")
++ .description("CSV content that could not be converted")
++ .build();
@VisibleForTesting
-- static final PropertyDescriptor SCHEMA
-- = new PropertyDescriptor.Builder()
-- .name("Record schema")
-- .description("Outgoing Avro schema for each record created from a CSV row")
-- .addValidator(SCHEMA_VALIDATOR)
-- .expressionLanguageSupported(true)
-- .required(true)
-- .build();
++ static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
++ .name("Record schema")
++ .description("Outgoing Avro schema for each record created from a CSV row")
++ .addValidator(SCHEMA_VALIDATOR)
++ .expressionLanguageSupported(true)
++ .required(true)
++ .build();
@VisibleForTesting
-- static final PropertyDescriptor CHARSET
-- = new PropertyDescriptor.Builder()
-- .name("CSV charset")
-- .description("Character set for CSV files")
-- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-- .defaultValue(DEFAULTS.charset)
-- .build();
++ static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
++ .name("CSV charset")
++ .description("Character set for CSV files")
++ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
++ .defaultValue(DEFAULTS.charset)
++ .build();
@VisibleForTesting
-- static final PropertyDescriptor DELIMITER
-- = new PropertyDescriptor.Builder()
-- .name("CSV delimiter")
-- .description("Delimiter character for CSV records")
-- .addValidator(CHAR_VALIDATOR)
-- .defaultValue(DEFAULTS.delimiter)
-- .build();
++ static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
++ .name("CSV delimiter")
++ .description("Delimiter character for CSV records")
++ .addValidator(CHAR_VALIDATOR)
++ .defaultValue(DEFAULTS.delimiter)
++ .build();
@VisibleForTesting
-- static final PropertyDescriptor QUOTE
-- = new PropertyDescriptor.Builder()
-- .name("CSV quote character")
-- .description("Quote character for CSV values")
-- .addValidator(CHAR_VALIDATOR)
-- .defaultValue(DEFAULTS.quote)
-- .build();
++ static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder()
++ .name("CSV quote character")
++ .description("Quote character for CSV values")
++ .addValidator(CHAR_VALIDATOR)
++ .defaultValue(DEFAULTS.quote)
++ .build();
@VisibleForTesting
-- static final PropertyDescriptor ESCAPE
-- = new PropertyDescriptor.Builder()
-- .name("CSV escape character")
-- .description("Escape character for CSV values")
-- .addValidator(CHAR_VALIDATOR)
-- .defaultValue(DEFAULTS.escape)
-- .build();
++ static final PropertyDescriptor ESCAPE = new PropertyDescriptor.Builder()
++ .name("CSV escape character")
++ .description("Escape character for CSV values")
++ .addValidator(CHAR_VALIDATOR)
++ .defaultValue(DEFAULTS.escape)
++ .build();
@VisibleForTesting
-- static final PropertyDescriptor HAS_HEADER
-- = new PropertyDescriptor.Builder()
-- .name("Use CSV header line")
-- .description("Whether to use the first line as a header")
-- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-- .defaultValue(String.valueOf(DEFAULTS.useHeader))
-- .build();
++ static final PropertyDescriptor HAS_HEADER = new PropertyDescriptor.Builder()
++ .name("Use CSV header line")
++ .description("Whether to use the first line as a header")
++ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
++ .defaultValue(String.valueOf(DEFAULTS.useHeader))
++ .build();
@VisibleForTesting
-- static final PropertyDescriptor LINES_TO_SKIP
-- = new PropertyDescriptor.Builder()
-- .name("Lines to skip")
-- .description("Number of lines to skip before reading header or data")
-- .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
-- .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
-- .build();
--
-- private static final List<PropertyDescriptor> PROPERTIES
-- = ImmutableList.<PropertyDescriptor>builder()
-- .addAll(AbstractKiteProcessor.getProperties())
-- .add(SCHEMA)
-- .add(CHARSET)
-- .add(DELIMITER)
-- .add(QUOTE)
-- .add(ESCAPE)
-- .add(HAS_HEADER)
-- .add(LINES_TO_SKIP)
-- .build();
--
-- private static final Set<Relationship> RELATIONSHIPS
-- = ImmutableSet.<Relationship>builder()
-- .add(SUCCESS)
-- .add(FAILURE)
-- .add(INCOMPATIBLE)
-- .build();
++ static final PropertyDescriptor LINES_TO_SKIP = new PropertyDescriptor.Builder()
++ .name("Lines to skip")
++ .description("Number of lines to skip before reading header or data")
++ .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
++ .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
++ .build();
++
++ private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor> builder()
++ .addAll(AbstractKiteProcessor.getProperties())
++ .add(SCHEMA)
++ .add(CHARSET)
++ .add(DELIMITER)
++ .add(QUOTE)
++ .add(ESCAPE)
++ .add(HAS_HEADER)
++ .add(LINES_TO_SKIP)
++ .build();
++
++ private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder()
++ .add(SUCCESS)
++ .add(FAILURE)
++ .add(INCOMPATIBLE)
++ .build();
// Immutable configuration
@VisibleForTesting
@@@ -196,26 -198,26 +191,26 @@@
super.setDefaultConfiguration(context);
this.props = new CSVProperties.Builder()
-- .charset(context.getProperty(CHARSET).getValue())
-- .delimiter(context.getProperty(DELIMITER).getValue())
-- .quote(context.getProperty(QUOTE).getValue())
-- .escape(context.getProperty(ESCAPE).getValue())
-- .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
-- .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
-- .build();
++ .charset(context.getProperty(CHARSET).getValue())
++ .delimiter(context.getProperty(DELIMITER).getValue())
++ .quote(context.getProperty(QUOTE).getValue())
++ .escape(context.getProperty(ESCAPE).getValue())
++ .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
++ .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
++ .build();
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
-- throws ProcessException {
++ throws ProcessException {
FlowFile incomingCSV = session.get();
if (incomingCSV == null) {
return;
}
String schemaProperty = context.getProperty(SCHEMA)
-- .evaluateAttributeExpressions(incomingCSV)
-- .getValue();
++ .evaluateAttributeExpressions(incomingCSV)
++ .getValue();
final Schema schema;
try {
schema = getSchema(schemaProperty, DefaultConfiguration.get());
@@@ -225,78 -227,85 +220,87 @@@
return;
}
-- final DataFileWriter<Record> writer = new DataFileWriter<>(
-- AvroUtil.newDatumWriter(schema, Record.class));
-- writer.setCodec(CodecFactory.snappyCodec());
++ try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) {
++ writer.setCodec(CodecFactory.snappyCodec());
-- try {
-- final LongHolder written = new LongHolder(0L);
-- final FailureTracker failures = new FailureTracker();
--
-- FlowFile badRecords = session.clone(incomingCSV);
-- FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() {
-- @Override
-- public void process(InputStream in, OutputStream out) throws IOException {
-- try (CSVFileReader<Record> reader = new CSVFileReader<>(
++ try {
++ final LongHolder written = new LongHolder(0L);
++ final FailureTracker failures = new FailureTracker();
++
++ FlowFile badRecords = session.clone(incomingCSV);
++ FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() {
++ @Override
++ public void process(InputStream in, OutputStream out) throws IOException {
++ try (CSVFileReader<Record> reader = new CSVFileReader<>(
in, props, schema, Record.class)) {
-- reader.initialize();
-- try (DataFileWriter<Record> w = writer.create(schema, out)) {
-- while (reader.hasNext()) {
-- try {
-- Record record = reader.next();
-- w.append(record);
-- written.incrementAndGet();
-- } catch (DatasetRecordException e) {
-- failures.add(e);
++ reader.initialize();
++ try (DataFileWriter<Record> w = writer.create(schema, out)) {
++ while (reader.hasNext()) {
++ try {
++ Record record = reader.next();
++ w.append(record);
++ written.incrementAndGet();
++ } catch (DatasetRecordException e) {
++ failures.add(e);
++ }
}
}
}
}
-- }
-- });
++ });
-- long errors = failures.count();
++ long errors = failures.count();
-- session.adjustCounter("Converted records", written.get(),
++ session.adjustCounter("Converted records", written.get(),
false /* update only if file transfer is successful */);
-- session.adjustCounter("Conversion errors", errors,
++ session.adjustCounter("Conversion errors", errors,
false /* update only if file transfer is successful */);
-- if (written.get() > 0L) {
-- session.transfer(outgoingAvro, SUCCESS);
++ if (written.get() > 0L) {
++ session.transfer(outgoingAvro, SUCCESS);
-- if (errors > 0L) {
-- getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
-- new Object[] { errors, errors + written.get() });
-- badRecords = session.putAttribute(
++ if (errors > 0L) {
++ getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
++ new Object[] {errors, errors + written.get()});
++ badRecords = session.putAttribute(
badRecords, "errors", failures.summary());
-- session.transfer(badRecords, INCOMPATIBLE);
-- } else {
-- session.remove(badRecords);
-- }
++ session.transfer(badRecords, INCOMPATIBLE);
++ } else {
++ session.remove(badRecords);
++ }
-- } else {
-- session.remove(outgoingAvro);
++ } else {
++ session.remove(outgoingAvro);
-- if (errors > 0L) {
-- getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
-- new Object[] { errors, errors });
-- badRecords = session.putAttribute(
++ if (errors > 0L) {
++ getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
++ new Object[] {errors, errors});
++ badRecords = session.putAttribute(
badRecords, "errors", failures.summary());
-- } else {
-- badRecords = session.putAttribute(
++ } else {
++ badRecords = session.putAttribute(
badRecords, "errors", "No incoming records");
++ }
++
++ session.transfer(badRecords, FAILURE);
}
-- session.transfer(badRecords, FAILURE);
++ } catch (ProcessException | DatasetIOException e) {
++ getLogger().error("Failed reading or writing", e);
++ session.transfer(incomingCSV, FAILURE);
++ } catch (DatasetException e) {
++ getLogger().error("Failed to read FlowFile", e);
++ session.transfer(incomingCSV, FAILURE);
}
-
- } catch (ProcessException | DatasetIOException e) {
- getLogger().error("Failed reading or writing", e);
- session.transfer(incomingCSV, FAILURE);
- } catch (DatasetException e) {
- getLogger().error("Failed to read FlowFile", e);
- session.transfer(incomingCSV, FAILURE);
++ } catch (final IOException ioe) {
++ throw new RuntimeException("Unable to close Avro Writer", ioe);
+ }
+ }
- } catch (ProcessException | DatasetIOException e) {
- getLogger().error("Failed reading or writing", e);
- session.transfer(incomingCSV, FAILURE);
- } catch (DatasetException e) {
- getLogger().error("Failed to read FlowFile", e);
- session.transfer(incomingCSV, FAILURE);
+ private static String unescapeString(String input) {
+ if (input.length() > 1) {
+ input = StringEscapeUtils.unescapeJava(input);
}
+ return input;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 258e122,9ad1703..88b6666
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@@ -63,9 -61,8 +63,9 @@@ import org.eclipse.jetty.servlet.Servle
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"ingest", "http", "https", "rest", "listen"})
- @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
+ @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
public class ListenHTTP extends AbstractSessionFactoryProcessor {
private Set<Relationship> relationships;