You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:46 UTC

[20/50] [abbrv] nifi git commit: NIFI-810: Merged master into branch

NIFI-810: Merged master into branch


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0636f0e7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0636f0e7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0636f0e7

Branch: refs/heads/NIFI-655
Commit: 0636f0e731cd28299edd3a6e9db90de5045ab662
Parents: 8e2308b d63cd6b
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 25 11:02:40 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:02:40 2015 -0400

----------------------------------------------------------------------
 .../src/main/asciidoc/administration-guide.adoc |   4 +-
 .../src/main/java/MyProcessor.java              |  11 +-
 .../nifi/processors/avro/ConvertAvroToJSON.java |  67 ++++-
 .../processors/avro/TestConvertAvroToJSON.java  |  47 ++-
 .../processors/aws/AbstractAWSProcessor.java    |   2 +-
 .../nifi/processors/aws/s3/DeleteS3Object.java  |  98 ++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/aws/s3/TestDeleteS3Object.java   | 141 +++++++++
 .../nifi/controller/FlowUnmarshaller.java       |  77 -----
 .../src/main/resources/FlowConfiguration.xsd    |   2 +-
 .../src/main/resources/bin/nifi.sh              |  96 +++---
 .../canvas/new-controller-service-dialog.jsp    |   1 -
 .../partials/canvas/new-processor-dialog.jsp    |   1 -
 .../canvas/new-reporting-task-dialog.jsp        |   1 -
 .../css/new-controller-service-dialog.css       |   9 -
 .../main/webapp/css/new-processor-dialog.css    |   9 -
 .../webapp/css/new-reporting-task-dialog.css    |   9 -
 .../webapp/js/nf/canvas/nf-canvas-toolbox.js    |  60 ++--
 .../src/main/webapp/js/nf/canvas/nf-settings.js | 140 +++++----
 .../processors/kite/AbstractKiteProcessor.java  |  11 +-
 .../nifi/processors/kite/ConvertCSVToAvro.java  | 296 ++++++++++---------
 .../processors/kite/TestCSVToAvroProcessor.java |  39 +++
 .../nifi-standard-prioritizers/pom.xml          |   4 +
 .../PriorityAttributePrioritizer.java           |   7 +-
 .../PriorityAttributePrioritizerTest.java       |  17 +-
 .../nifi-standard-processors/pom.xml            |   9 +
 .../nifi/processors/standard/ExecuteSQL.java    |   9 +-
 .../nifi/processors/standard/InvokeHTTP.java    |   1 +
 .../nifi/processors/standard/ListenHTTP.java    | 105 ++++---
 .../standard/PutDistributedMapCache.java        | 252 ++++++++++++++++
 .../servlets/ContentAcknowledgmentServlet.java  |   3 +-
 .../standard/servlets/ListenHTTPServlet.java    |   8 +-
 .../processors/standard/util/JdbcCommon.java    |  70 ++++-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/standard/TestGetFile.java   |  21 +-
 .../standard/TestPutDistributedMapCache.java    | 277 +++++++++++++++++
 .../standard/util/TestJdbcCommon.java           |  42 +++
 .../standard/util/TestJdbcTypesDerby.java       | 133 +++++++++
 .../standard/util/TestJdbcTypesH2.java          | 149 ++++++++++
 pom.xml                                         |   2 +-
 40 files changed, 1725 insertions(+), 507 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index b214427,f0ba71a..f0f1630
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@@ -35,7 -36,8 +38,7 @@@ import org.apache.nifi.annotation.behav
  import org.apache.nifi.annotation.behavior.SupportsBatching;
  import org.apache.nifi.annotation.behavior.WritesAttribute;
  import org.apache.nifi.annotation.documentation.CapabilityDescription;
--import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.components.PropertyDescriptor;
  import org.apache.nifi.flowfile.FlowFile;
  import org.apache.nifi.flowfile.attributes.CoreAttributes;
  import org.apache.nifi.processor.AbstractProcessor;
@@@ -47,8 -50,7 +51,7 @@@ import org.apache.nifi.processor.io.Str
  
  @SideEffectFree
  @SupportsBatching
- @Tags({ "json", "avro", "binary" })
 -@Tags({"json", "avro", "binary"})
 +@InputRequirement(Requirement.INPUT_REQUIRED)
  @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
      + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
      + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 6f126aa,ea84daa..43b33ff
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@@ -30,8 -30,7 +30,9 @@@ import org.apache.avro.Schema
  import org.apache.avro.file.CodecFactory;
  import org.apache.avro.file.DataFileWriter;
  import org.apache.avro.generic.GenericData.Record;
+ import org.apache.commons.lang3.StringEscapeUtils;
 +import org.apache.nifi.annotation.behavior.InputRequirement;
 +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
  import org.apache.nifi.annotation.documentation.CapabilityDescription;
  import org.apache.nifi.annotation.documentation.Tags;
  import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@@ -68,114 -66,118 +69,108 @@@ public class ConvertCSVToAvro extends A
  
      private static final Validator CHAR_VALIDATOR = new Validator() {
          @Override
--        public ValidationResult validate(String subject, String input,
--                ValidationContext context) {
++        public ValidationResult validate(String subject, String input, ValidationContext context) {
+             // Allows special, escaped characters as input, which is then unescaped and converted to a single character.
+             // Examples for special characters: \t (or \u0009), \f.
+             input = unescapeString(input);
+ 
              return new ValidationResult.Builder()
--                    .subject(subject)
--                    .input(input)
-                     .explanation("Only single characters are supported")
-                     .valid(input.length() == 1)
 -                    .explanation("Only non-null single characters are supported")
 -                    .valid(input.length() == 1 && input.charAt(0) != 0)
--                    .build();
++                .subject(subject)
++                .input(input)
++                .explanation("Only non-null single characters are supported")
++                .valid(input.length() == 1 && input.charAt(0) != 0)
++                .build();
          }
      };
  
      private static final Relationship SUCCESS = new Relationship.Builder()
--            .name("success")
--            .description("Avro content that was converted successfully from CSV")
--            .build();
++        .name("success")
++        .description("Avro content that was converted successfully from CSV")
++        .build();
  
      private static final Relationship FAILURE = new Relationship.Builder()
--            .name("failure")
--            .description("CSV content that could not be processed")
--            .build();
++        .name("failure")
++        .description("CSV content that could not be processed")
++        .build();
  
      private static final Relationship INCOMPATIBLE = new Relationship.Builder()
--            .name("incompatible")
--            .description("CSV content that could not be converted")
--            .build();
++        .name("incompatible")
++        .description("CSV content that could not be converted")
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor SCHEMA
--            = new PropertyDescriptor.Builder()
--            .name("Record schema")
--            .description("Outgoing Avro schema for each record created from a CSV row")
--            .addValidator(SCHEMA_VALIDATOR)
--            .expressionLanguageSupported(true)
--            .required(true)
--            .build();
++    static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
++        .name("Record schema")
++        .description("Outgoing Avro schema for each record created from a CSV row")
++        .addValidator(SCHEMA_VALIDATOR)
++        .expressionLanguageSupported(true)
++        .required(true)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor CHARSET
--            = new PropertyDescriptor.Builder()
--            .name("CSV charset")
--            .description("Character set for CSV files")
--            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
--            .defaultValue(DEFAULTS.charset)
--            .build();
++    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
++        .name("CSV charset")
++        .description("Character set for CSV files")
++        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
++        .defaultValue(DEFAULTS.charset)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor DELIMITER
--            = new PropertyDescriptor.Builder()
--            .name("CSV delimiter")
--            .description("Delimiter character for CSV records")
--            .addValidator(CHAR_VALIDATOR)
--            .defaultValue(DEFAULTS.delimiter)
--            .build();
++    static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
++        .name("CSV delimiter")
++        .description("Delimiter character for CSV records")
++        .addValidator(CHAR_VALIDATOR)
++        .defaultValue(DEFAULTS.delimiter)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor QUOTE
--            = new PropertyDescriptor.Builder()
--            .name("CSV quote character")
--            .description("Quote character for CSV values")
--            .addValidator(CHAR_VALIDATOR)
--            .defaultValue(DEFAULTS.quote)
--            .build();
++    static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder()
++        .name("CSV quote character")
++        .description("Quote character for CSV values")
++        .addValidator(CHAR_VALIDATOR)
++        .defaultValue(DEFAULTS.quote)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor ESCAPE
--            = new PropertyDescriptor.Builder()
--            .name("CSV escape character")
--            .description("Escape character for CSV values")
--            .addValidator(CHAR_VALIDATOR)
--            .defaultValue(DEFAULTS.escape)
--            .build();
++    static final PropertyDescriptor ESCAPE = new PropertyDescriptor.Builder()
++        .name("CSV escape character")
++        .description("Escape character for CSV values")
++        .addValidator(CHAR_VALIDATOR)
++        .defaultValue(DEFAULTS.escape)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor HAS_HEADER
--            = new PropertyDescriptor.Builder()
--            .name("Use CSV header line")
--            .description("Whether to use the first line as a header")
--            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
--            .defaultValue(String.valueOf(DEFAULTS.useHeader))
--            .build();
++    static final PropertyDescriptor HAS_HEADER = new PropertyDescriptor.Builder()
++        .name("Use CSV header line")
++        .description("Whether to use the first line as a header")
++        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
++        .defaultValue(String.valueOf(DEFAULTS.useHeader))
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor LINES_TO_SKIP
--            = new PropertyDescriptor.Builder()
--            .name("Lines to skip")
--            .description("Number of lines to skip before reading header or data")
--            .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
--            .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
--            .build();
--
--    private static final List<PropertyDescriptor> PROPERTIES
--            = ImmutableList.<PropertyDescriptor>builder()
--            .addAll(AbstractKiteProcessor.getProperties())
--            .add(SCHEMA)
--            .add(CHARSET)
--            .add(DELIMITER)
--            .add(QUOTE)
--            .add(ESCAPE)
--            .add(HAS_HEADER)
--            .add(LINES_TO_SKIP)
--            .build();
--
--    private static final Set<Relationship> RELATIONSHIPS
--            = ImmutableSet.<Relationship>builder()
--            .add(SUCCESS)
--            .add(FAILURE)
--            .add(INCOMPATIBLE)
--            .build();
++    static final PropertyDescriptor LINES_TO_SKIP = new PropertyDescriptor.Builder()
++        .name("Lines to skip")
++        .description("Number of lines to skip before reading header or data")
++        .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
++        .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
++        .build();
++
++    private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor> builder()
++        .addAll(AbstractKiteProcessor.getProperties())
++        .add(SCHEMA)
++        .add(CHARSET)
++        .add(DELIMITER)
++        .add(QUOTE)
++        .add(ESCAPE)
++        .add(HAS_HEADER)
++        .add(LINES_TO_SKIP)
++        .build();
++
++    private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder()
++        .add(SUCCESS)
++        .add(FAILURE)
++        .add(INCOMPATIBLE)
++        .build();
  
      // Immutable configuration
      @VisibleForTesting
@@@ -196,26 -198,26 +191,26 @@@
          super.setDefaultConfiguration(context);
  
          this.props = new CSVProperties.Builder()
--                .charset(context.getProperty(CHARSET).getValue())
--                .delimiter(context.getProperty(DELIMITER).getValue())
--                .quote(context.getProperty(QUOTE).getValue())
--                .escape(context.getProperty(ESCAPE).getValue())
--                .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
--                .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
--                .build();
++            .charset(context.getProperty(CHARSET).getValue())
++            .delimiter(context.getProperty(DELIMITER).getValue())
++            .quote(context.getProperty(QUOTE).getValue())
++            .escape(context.getProperty(ESCAPE).getValue())
++            .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
++            .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
++            .build();
      }
  
      @Override
      public void onTrigger(ProcessContext context, final ProcessSession session)
--            throws ProcessException {
++        throws ProcessException {
          FlowFile incomingCSV = session.get();
          if (incomingCSV == null) {
              return;
          }
  
          String schemaProperty = context.getProperty(SCHEMA)
--                .evaluateAttributeExpressions(incomingCSV)
--                .getValue();
++            .evaluateAttributeExpressions(incomingCSV)
++            .getValue();
          final Schema schema;
          try {
              schema = getSchema(schemaProperty, DefaultConfiguration.get());
@@@ -225,78 -227,85 +220,87 @@@
              return;
          }
  
--        final DataFileWriter<Record> writer = new DataFileWriter<>(
--                AvroUtil.newDatumWriter(schema, Record.class));
--        writer.setCodec(CodecFactory.snappyCodec());
++        try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) {
++            writer.setCodec(CodecFactory.snappyCodec());
  
--        try {
--            final LongHolder written = new LongHolder(0L);
--            final FailureTracker failures = new FailureTracker();
--
--            FlowFile badRecords = session.clone(incomingCSV);
--            FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() {
--                @Override
--                public void process(InputStream in, OutputStream out) throws IOException {
--                    try (CSVFileReader<Record> reader = new CSVFileReader<>(
++            try {
++                final LongHolder written = new LongHolder(0L);
++                final FailureTracker failures = new FailureTracker();
++
++                FlowFile badRecords = session.clone(incomingCSV);
++                FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() {
++                    @Override
++                    public void process(InputStream in, OutputStream out) throws IOException {
++                        try (CSVFileReader<Record> reader = new CSVFileReader<>(
                              in, props, schema, Record.class)) {
--                        reader.initialize();
--                        try (DataFileWriter<Record> w = writer.create(schema, out)) {
--                            while (reader.hasNext()) {
--                                try {
--                                    Record record = reader.next();
--                                    w.append(record);
--                                    written.incrementAndGet();
--                                } catch (DatasetRecordException e) {
--                                    failures.add(e);
++                            reader.initialize();
++                            try (DataFileWriter<Record> w = writer.create(schema, out)) {
++                                while (reader.hasNext()) {
++                                    try {
++                                        Record record = reader.next();
++                                        w.append(record);
++                                        written.incrementAndGet();
++                                    } catch (DatasetRecordException e) {
++                                        failures.add(e);
++                                    }
                                  }
                              }
                          }
                      }
--                }
--            });
++                });
  
--            long errors = failures.count();
++                long errors = failures.count();
  
--            session.adjustCounter("Converted records", written.get(),
++                session.adjustCounter("Converted records", written.get(),
                      false /* update only if file transfer is successful */);
--            session.adjustCounter("Conversion errors", errors,
++                session.adjustCounter("Conversion errors", errors,
                      false /* update only if file transfer is successful */);
  
--            if (written.get() > 0L) {
--                session.transfer(outgoingAvro, SUCCESS);
++                if (written.get() > 0L) {
++                    session.transfer(outgoingAvro, SUCCESS);
  
--                if (errors > 0L) {
--                    getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
--                            new Object[] { errors, errors + written.get() });
--                    badRecords = session.putAttribute(
++                    if (errors > 0L) {
++                        getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
++                            new Object[] {errors, errors + written.get()});
++                        badRecords = session.putAttribute(
                              badRecords, "errors", failures.summary());
--                    session.transfer(badRecords, INCOMPATIBLE);
--                } else {
--                    session.remove(badRecords);
--                }
++                        session.transfer(badRecords, INCOMPATIBLE);
++                    } else {
++                        session.remove(badRecords);
++                    }
  
--            } else {
--                session.remove(outgoingAvro);
++                } else {
++                    session.remove(outgoingAvro);
  
--                if (errors > 0L) {
--                    getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
--                            new Object[] { errors, errors });
--                    badRecords = session.putAttribute(
++                    if (errors > 0L) {
++                        getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
++                            new Object[] {errors, errors});
++                        badRecords = session.putAttribute(
                              badRecords, "errors", failures.summary());
--                } else {
--                    badRecords = session.putAttribute(
++                    } else {
++                        badRecords = session.putAttribute(
                              badRecords, "errors", "No incoming records");
++                    }
++
++                    session.transfer(badRecords, FAILURE);
                  }
  
--                session.transfer(badRecords, FAILURE);
++            } catch (ProcessException | DatasetIOException e) {
++                getLogger().error("Failed reading or writing", e);
++                session.transfer(incomingCSV, FAILURE);
++            } catch (DatasetException e) {
++                getLogger().error("Failed to read FlowFile", e);
++                session.transfer(incomingCSV, FAILURE);
              }
 -
 -        } catch (ProcessException | DatasetIOException e) {
 -            getLogger().error("Failed reading or writing", e);
 -            session.transfer(incomingCSV, FAILURE);
 -        } catch (DatasetException e) {
 -            getLogger().error("Failed to read FlowFile", e);
 -            session.transfer(incomingCSV, FAILURE);
++        } catch (final IOException ioe) {
++            throw new RuntimeException("Unable to close Avro Writer", ioe);
+         }
+     }
  
-         } catch (ProcessException | DatasetIOException e) {
-             getLogger().error("Failed reading or writing", e);
-             session.transfer(incomingCSV, FAILURE);
-         } catch (DatasetException e) {
-             getLogger().error("Failed to read FlowFile", e);
-             session.transfer(incomingCSV, FAILURE);
+     private static String unescapeString(String input) {
+         if (input.length() > 1) {
+             input = StringEscapeUtils.unescapeJava(input);
          }
+         return input;
      }
  }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 258e122,9ad1703..88b6666
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@@ -63,9 -61,8 +63,9 @@@ import org.eclipse.jetty.servlet.Servle
  import org.eclipse.jetty.util.ssl.SslContextFactory;
  import org.eclipse.jetty.util.thread.QueuedThreadPool;
  
 +@InputRequirement(Requirement.INPUT_FORBIDDEN)
  @Tags({"ingest", "http", "https", "rest", "listen"})
- @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
+ @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
  public class ListenHTTP extends AbstractSessionFactoryProcessor {
  
      private Set<Relationship> relationships;