You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/25 16:03:01 UTC

[01/41] nifi git commit: [NIFI-774] Create a DeleteS3Object Processor

Repository: nifi
Updated Branches:
  refs/heads/NIFI-810-InputRequirement 8e2308b78 -> 0636f0e73


[NIFI-774] Create a DeleteS3Object Processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1dbd376
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1dbd376
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1dbd376

Branch: refs/heads/NIFI-810-InputRequirement
Commit: d1dbd37629baeb9a1570f47f7583d07f54b38b5c
Parents: e4e263c
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Thu Jul 23 13:13:15 2015 +0900
Committer: Yuu ISHIKAWA <yu...@gmail.com>
Committed: Tue Sep 1 22:35:11 2015 +0900

----------------------------------------------------------------------
 .../nifi/processors/aws/s3/DeleteS3Object.java  | 108 +++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/aws/s3/TestDeleteS3Object.java   | 137 +++++++++++++++++++
 3 files changed, 246 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d1dbd376/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
new file mode 100644
index 0000000..3be7a15
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteVersionRequest;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+@SupportsBatching
+@SeeAlso({PutS3Object.class})
+@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
+@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
+        "And the FlowFiles are checked if exists or not before deleting.")
+public class DeleteS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
+            .name("Version")
+            .description("The Version of the Object to delete")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
+                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .dynamic(true)
+                .build();
+    }
+
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 s3 = getClient();
+        try {
+            // Checks if the key exists or not
+            // If there is no such a key, then throws a exception
+            s3.getObjectMetadata(bucket, key);
+
+            // Deletes a key on Amazon S3
+            if (versionId == null) {
+                final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key);
+                s3.deleteObject(r);
+            } else {
+                final DeleteVersionRequest r = new DeleteVersionRequest(bucket, key, versionId);
+                s3.deleteVersion(r);
+            }
+        } catch (final AmazonServiceException ase) {
+            getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1dbd376/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 4f2405c..d0d1e73 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,6 +14,7 @@
 # limitations under the License.
 org.apache.nifi.processors.aws.s3.FetchS3Object
 org.apache.nifi.processors.aws.s3.PutS3Object
+org.apache.nifi.processors.aws.s3.DeleteS3Object
 org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS
 org.apache.nifi.processors.aws.sqs.PutSQS

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1dbd376/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
new file mode 100644
index 0000000..dfe6edb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.*;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+
+@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
+public class TestDeleteS3Object {
+
+    private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
+
+    // When you want to test this, you should create a bucket on Amazon S3 as follows.
+    private static final String TEST_REGION = "ap-northeast-1";
+    private static final String TEST_BUCKET = "test-bucket-00000000-0000-0000-0000-1234567890123";
+
+    @BeforeClass
+    public static void oneTimeSetUp() {
+        // Creates a new bucket for this test
+        try {
+            PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
+            AmazonS3Client client = new AmazonS3Client(credentials);
+            CreateBucketRequest request = new CreateBucketRequest(TEST_BUCKET, TEST_REGION);
+            client.createBucket(request);
+        } catch (final AmazonS3Exception e) {
+            System.out.println(TEST_BUCKET + " already exists.");
+        } catch (final IOException e) {
+            System.out.println(CREDENTIALS_FILE + " doesn't exist.");
+        }
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws IOException {
+        // Delete a bucket for this test
+        PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
+        AmazonS3Client client = new AmazonS3Client(credentials);
+        DeleteBucketRequest dbr = new DeleteBucketRequest(TEST_BUCKET);
+        client.deleteBucket(dbr);
+    }
+
+    @Test
+    public void testSimpleDelete() throws IOException {
+        // Prepares for this test
+        uploadTestFile("hello.txt");
+
+        DeleteS3Object deleter = new DeleteS3Object();
+        final TestRunner runner = TestRunners.newTestRunner(deleter);
+        runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
+        runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
+        runner.setProperty(DeleteS3Object.KEY, "hello.txt");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testDeleteFolder() throws IOException {
+        // Prepares for this test
+        uploadTestFile("folder/1.txt");
+
+        DeleteS3Object deleter = new DeleteS3Object();
+        final TestRunner runner = TestRunners.newTestRunner(deleter);
+        runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
+        runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
+        runner.setProperty(DeleteS3Object.KEY, "folder/1.txt");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testTryToDeleteNotExistingFile() throws IOException {
+        DeleteS3Object deleter = new DeleteS3Object();
+        final TestRunner runner = TestRunners.newTestRunner(deleter);
+        runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
+        runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
+        runner.setProperty(DeleteS3Object.BUCKET, "no-such-a-key");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "no-such-a-file");
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    // Uploads a test file
+    private void uploadTestFile(String key) throws IOException {
+        PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
+        AmazonS3Client client = new AmazonS3Client(credentials);
+        URL fileURL = this.getClass().getClassLoader().getResource("hello.txt");
+        File file = new File(fileURL.getPath());
+        PutObjectRequest putRequest = new PutObjectRequest(TEST_BUCKET, key, file);
+        PutObjectResult result = client.putObject(putRequest);
+    }
+}


[09/41] nifi git commit: Remove `REL_NOT_FOUND`

Posted by ma...@apache.org.
Remove `REL_NOT_FOUND`


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/eb1d6b55
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/eb1d6b55
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/eb1d6b55

Branch: refs/heads/NIFI-810-InputRequirement
Commit: eb1d6b554cc589d7a62b4bf452bf4ebfbad1f8ff
Parents: f718b4b
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Fri Sep 11 18:05:20 2015 +0900
Committer: Yu ISHIKAWA <yu...@gmail.com>
Committed: Fri Sep 11 18:05:20 2015 +0900

----------------------------------------------------------------------
 .../org/apache/nifi/processors/aws/s3/DeleteS3Object.java | 10 +---------
 1 file changed, 1 insertion(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/eb1d6b55/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index c8950c3..836e0d8 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -18,15 +18,14 @@ package org.apache.nifi.processors.aws.s3;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteVersionRequest;
+
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -35,7 +34,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
 
@@ -46,9 +44,6 @@ import org.apache.nifi.processor.util.StandardValidators;
         "And the FlowFiles are checked if exists or not before deleting.")
 public class DeleteS3Object extends AbstractS3Processor {
 
-    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
-            .description("FlowFiles are routed to 'not found' if it doesn't exist on Amazon S3").build();
-
     public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
             .name("Version")
             .description("The Version of the Object to delete")
@@ -61,9 +56,6 @@ public class DeleteS3Object extends AbstractS3Processor {
             Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
                     FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
 
-    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
-            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_NOT_FOUND)));
-
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;


[27/41] nifi git commit: NIFI-1046 shell clean up: quoting prevents globbing & word splitting.

Posted by ma...@apache.org.
NIFI-1046 shell clean up: quoting prevents globbing & word splitting.

Ignoring the unquoted array expansions on L188/190; suspect they ought to be quoted per [SC2068](https://github.com/koalaman/shellcheck/wiki/SC2068), but need to confirm desired arguments.

Reviewed by Tony Kurc (tkurc@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9aa716b0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9aa716b0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9aa716b0

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 9aa716b0592bfd3552880b4d0042db499307f778
Parents: 58d9b02
Author: Alex Moundalexis <al...@cloudera.com>
Authored: Mon Oct 19 11:02:39 2015 -0700
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 19 20:36:07 2015 -0400

----------------------------------------------------------------------
 .../src/main/resources/bin/nifi.sh              | 26 +++++++++++---------
 1 file changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9aa716b0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index f8d90c4..1958e34 100755
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -21,7 +21,9 @@
 
 # Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
 
-NIFI_HOME=$(cd $(dirname "$0") && cd .. && pwd)
+SCRIPT_DIR=$(dirname "$0")
+SCRIPT_NAME=$(basename "$0")
+NIFI_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
 PROGNAME=$(basename "$0")
 
 
@@ -137,21 +139,21 @@ install() {
         fi
 
         SVC_FILE=/etc/init.d/$SVC_NAME
-        cp $0 $SVC_FILE
-        sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": $SVC_FILE
-        sed -i s:PROGNAME=.*:PROGNAME=$(basename "$0"): $SVC_FILE
-        rm -f /etc/rc2.d/S65${SVC_NAME}
-        ln -s /etc/init.d/$SVC_NAME /etc/rc2.d/S65${SVC_NAME}
-        rm -f /etc/rc2.d/K65${SVC_NAME}
-        ln -s /etc/init.d/$SVC_NAME /etc/rc2.d/K65${SVC_NAME}
-        echo Service $SVC_NAME installed
+        cp "$0" "$SVC_FILE"
+        sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": "$SVC_FILE"
+        sed -i s:PROGNAME=.*:PROGNAME="${SCRIPT_NAME}": "$SVC_FILE"
+        rm -f "/etc/rc2.d/S65${SVC_NAME}"
+        ln -s "/etc/init.d/$SVC_NAME" "/etc/rc2.d/S65${SVC_NAME}"
+        rm -f "/etc/rc2.d/K65${SVC_NAME}"
+        ln -s "/etc/init.d/$SVC_NAME" "/etc/rc2.d/K65${SVC_NAME}"
+        echo "Service $SVC_NAME installed"
 }
 
 
 run() {
     BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf";
 
-    run_as=$(grep run.as ${BOOTSTRAP_CONF} | cut -d'=' -f2)
+    run_as=$(grep run.as "${BOOTSTRAP_CONF}" | cut -d'=' -f2)
 
     sudo_cmd_prefix=""
     if $cygwin; then
@@ -183,9 +185,9 @@ run() {
     # run 'start' in the background because the process will continue to run, monitoring NiFi.
     # all other commands will terminate quickly so want to just wait for them
     if [ "$1" = "start" ]; then
-        (cd $NIFI_HOME && ${sudo_cmd_prefix} "$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ &)
+        (cd "$NIFI_HOME" && ${sudo_cmd_prefix} "$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ &)
     else
-        (cd $NIFI_HOME && ${sudo_cmd_prefix} "$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@)
+        (cd "$NIFI_HOME" && ${sudo_cmd_prefix} "$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@)
     fi
 
     # Wait just a bit (3 secs) to wait for the logging to finish and then echo a new-line.


[12/41] nifi git commit: NIFI-442: NIFI-828: - Always selecting the first item in the new component table. - Enabling adding the selected component by typing Enter. - Removing the 'filter by' in the new component dialogs and instead just searching every

Posted by ma...@apache.org.
NIFI-442:
NIFI-828:
- Always selecting the first item in the new component table.
- Enabling adding the selected component by typing Enter.
- Removing the 'filter by' in the new component dialogs and instead just searching every field.

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2583d786
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2583d786
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2583d786

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 2583d7869acedcbe536d4dd5eaf2e7c8f9cf149c
Parents: 31fba6b
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Oct 7 19:11:55 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Oct 7 19:11:55 2015 -0400

----------------------------------------------------------------------
 .../canvas/new-controller-service-dialog.jsp    |   1 -
 .../partials/canvas/new-processor-dialog.jsp    |   1 -
 .../canvas/new-reporting-task-dialog.jsp        |   1 -
 .../css/new-controller-service-dialog.css       |   9 --
 .../main/webapp/css/new-processor-dialog.css    |   9 --
 .../webapp/css/new-reporting-task-dialog.css    |   9 --
 .../webapp/js/nf/canvas/nf-canvas-toolbox.js    |  60 ++++----
 .../src/main/webapp/js/nf/canvas/nf-settings.js | 140 +++++++++----------
 8 files changed, 105 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-controller-service-dialog.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-controller-service-dialog.jsp
index eb54ace..9463cb9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-controller-service-dialog.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-controller-service-dialog.jsp
@@ -20,7 +20,6 @@
         <div id="controller-service-type-filter-controls">
             <div id="controller-service-type-filter-container">
                 <input type="text" id="controller-service-type-filter"/>
-                <div id="controller-service-type-filter-options"></div>
             </div>
             <div id="controller-service-type-filter-status">
                 Displaying&nbsp;<span id="displayed-controller-service-types"></span>&nbsp;of&nbsp;<span id="total-controller-service-types"></span>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-processor-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-processor-dialog.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-processor-dialog.jsp
index df7766c..ad33c1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-processor-dialog.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-processor-dialog.jsp
@@ -20,7 +20,6 @@
         <div id="processor-type-filter-controls">
             <div id="processor-type-filter-container">
                 <input type="text" id="processor-type-filter"/>
-                <div id="processor-type-filter-options"></div>
             </div>
             <div id="processor-type-filter-status">
                 Displaying&nbsp;<span id="displayed-processor-types"></span>&nbsp;of&nbsp;<span id="total-processor-types"></span>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-reporting-task-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-reporting-task-dialog.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-reporting-task-dialog.jsp
index cfb3992..2025aeb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-reporting-task-dialog.jsp
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-reporting-task-dialog.jsp
@@ -20,7 +20,6 @@
         <div id="reporting-task-type-filter-controls">
             <div id="controller-service-type-filter-container">
                 <input type="text" id="reporting-task-type-filter"/>
-                <div id="reporting-task-type-filter-options"></div>
             </div>
             <div id="reporting-task-type-filter-status">
                 Displaying&nbsp;<span id="displayed-reporting-task-types"></span>&nbsp;of&nbsp;<span id="total-reporting-task-types"></span>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-controller-service-dialog.css
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-controller-service-dialog.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-controller-service-dialog.css
index ff8d19e..57c3c55 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-controller-service-dialog.css
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-controller-service-dialog.css
@@ -139,14 +139,5 @@ div.availability-label {
     line-height: 20px;
     width: 173px;
     border: 1px solid #ccc;
-    margin-right: 3px;
     float: left;
-}
-
-#controller-service-type-filter-options {
-    float: left;
-    height: 17px;
-    line-height: 17px;
-    width: 85px;
-    margin-top: 1px;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-processor-dialog.css
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-processor-dialog.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-processor-dialog.css
index d5871ac..9ccd23e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-processor-dialog.css
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-processor-dialog.css
@@ -118,14 +118,5 @@
     line-height: 20px;
     width: 173px;
     border: 1px solid #ccc;
-    margin-right: 3px;
     float: left;
-}
-
-#processor-type-filter-options {
-    float: left;
-    height: 17px;
-    line-height: 17px;
-    width: 85px;
-    margin-top: 1px;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-reporting-task-dialog.css
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-reporting-task-dialog.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-reporting-task-dialog.css
index 24b4380..c8d629f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-reporting-task-dialog.css
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/new-reporting-task-dialog.css
@@ -139,14 +139,5 @@ div.availability-label {
     line-height: 20px;
     width: 173px;
     border: 1px solid #ccc;
-    margin-right: 3px;
     float: left;
-}
-
-#reporting-task-type-filter-options {
-    float: left;
-    height: 17px;
-    line-height: 17px;
-    width: 85px;
-    margin-top: 1px;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-toolbox.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-toolbox.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-toolbox.js
index 2812142..3ed1f1f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-toolbox.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-toolbox.js
@@ -106,10 +106,14 @@ nf.CanvasToolbox = (function () {
 
             // update the search criteria
             processorTypesData.setFilterArgs({
-                searchString: getFilterText(),
-                property: $('#processor-type-filter-options').combo('getSelectedOption').value
+                searchString: getFilterText()
             });
             processorTypesData.refresh();
+            
+            // update the selection if possible
+            if (processorTypesData.getLength() > 0) {
+                processorTypesGrid.setSelectedRows([0]);
+            }
         }
     };
 
@@ -134,7 +138,9 @@ nf.CanvasToolbox = (function () {
         }
 
         // determine if the item matches the filter
-        return item[args.property].search(filterExp) >= 0;
+        var matchesLabel = item['label'].search(filterExp) >= 0;
+        var matchesTags = item['tags'].search(filterExp) >= 0;
+        return matchesLabel || matchesTags;
     };
 
     /**
@@ -332,10 +338,18 @@ nf.CanvasToolbox = (function () {
         // show the dialog
         $('#new-processor-dialog').modal('show');
         
-        // set the focus in the filter field
-        $('#processor-type-filter').focus();
+        // setup the filter
+        $('#processor-type-filter').focus().off('keyup').on('keyup', function (e) {
+            var code = e.keyCode ? e.keyCode : e.which;
+            if (code === $.ui.keyCode.ENTER) {
+                addProcessor();
+            } else {
+                applyFilter();
+            }
+        });
 
         // adjust the grid canvas now that its been rendered
+        grid.setSelectedRows([0]);
         grid.resizeCanvas();
     };
 
@@ -870,20 +884,6 @@ nf.CanvasToolbox = (function () {
                 addToolboxIcon(config.type.template, toolbox, 'template-icon', 'template-icon-hover', 'template-icon-drag', promptForTemplate);
                 addToolboxIcon(config.type.label, toolbox, 'label-icon', 'label-icon-hover', 'label-icon-drag', createLabel);
 
-                // specify the combo options
-                $('#processor-type-filter-options').combo({
-                    options: [{
-                            text: 'by type',
-                            value: 'label'
-                        }, {
-                            text: 'by tag',
-                            value: 'tags'
-                        }],
-                    select: function (option) {
-                        applyFilter();
-                    }
-                });
-
                 // initialize the processor type table
                 var processorTypesColumns = [
                     {id: 'type', name: 'Type', field: 'label', sortable: true, resizable: true},
@@ -904,8 +904,7 @@ nf.CanvasToolbox = (function () {
                 });
                 processorTypesData.setItems([]);
                 processorTypesData.setFilterArgs({
-                    searchString: getFilterText(),
-                    property: $('#processor-type-filter-options').combo('getSelectedOption').value
+                    searchString: getFilterText()
                 });
                 processorTypesData.setFilter(filter);
 
@@ -1008,9 +1007,7 @@ nf.CanvasToolbox = (function () {
                 }).fail(nf.Common.handleAjaxError);
 
                 // define the function for filtering the list
-                $('#processor-type-filter').keyup(function () {
-                    applyFilter();
-                }).focus(function () {
+                $('#processor-type-filter').focus(function () {
                     if ($(this).hasClass(config.styles.filterList)) {
                         $(this).removeClass(config.styles.filterList).val('');
                     }
@@ -1024,6 +1021,9 @@ nf.CanvasToolbox = (function () {
                 $('#new-processor-dialog').modal({
                     headerText: 'Add Processor',
                     overlayBackground: false
+                }).draggable({
+                    containment: 'parent',
+                    handle: '.dialog-header'
                 });
 
                 // configure the new port dialog
@@ -1035,6 +1035,9 @@ nf.CanvasToolbox = (function () {
                             $('#new-port-name').val('');
                         }
                     }
+                }).draggable({
+                    containment: 'parent',
+                    handle: '.dialog-header'
                 });
 
                 // configure the new process group dialog
@@ -1046,6 +1049,9 @@ nf.CanvasToolbox = (function () {
                             $('#new-process-group-name').val('');
                         }
                     }
+                }).draggable({
+                    containment: 'parent',
+                    handle: '.dialog-header'
                 });
 
                 // configure the new remote process group dialog
@@ -1057,12 +1063,18 @@ nf.CanvasToolbox = (function () {
                             $('#new-remote-process-group-uri').val('');
                         }
                     }
+                }).draggable({
+                    containment: 'parent',
+                    handle: '.dialog-header'
                 });
 
                 // configure the instantiate template dialog
                 $('#instantiate-template-dialog').modal({
                     headerText: 'Instantiate Template',
                     overlayBackgroud: false
+                }).draggable({
+                    containment: 'parent',
+                    handle: '.dialog-header'
                 });
             } else {
                 // add disabled icons

http://git-wip-us.apache.org/repos/asf/nifi/blob/2583d786/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index 476f34b..f7e9472 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -156,14 +156,14 @@ nf.Settings = (function () {
 
             // update the search criteria
             controllerServiceTypesData.setFilterArgs({
-                searchString: getControllerServiceTypeFilterText(),
-                property: $('#controller-service-type-filter-options').combo('getSelectedOption').value
+                searchString: getControllerServiceTypeFilterText()
             });
-
-            // need to invalidate the entire table since parent elements may need to be 
-            // rerendered due to changes in their children
             controllerServiceTypesData.refresh();
-            controllerServiceTypesGrid.invalidate();
+
+            // update the selection if possible
+            if (controllerServiceTypesData.getLength() > 0) {
+                controllerServiceTypesGrid.setSelectedRows([0]);
+            }
         }
     };
 
@@ -249,7 +249,9 @@ nf.Settings = (function () {
         }
 
         // determine if the item matches the filter
-        return item[args.property].search(filterExp) >= 0;
+        var matchesLabel = item['label'].search(filterExp) >= 0;
+        var matchesTags = item['tags'].search(filterExp) >= 0;
+        return matchesLabel || matchesTags;
     };
 
     /**
@@ -279,6 +281,23 @@ nf.Settings = (function () {
     };
 
     /**
+     * Adds the currently selected controller service.
+     */
+    var addSelectedControllerService = function () {
+        var selectedServiceType = $('#selected-controller-service-type').text();
+
+        // ensure something was selected
+        if (selectedServiceType === '') {
+            nf.Dialog.showOkDialog({
+                dialogContent: 'The type of controller service to create must be selected.',
+                overlayBackground: false
+            });
+        } else {
+            addControllerService(selectedServiceType);
+        }
+    };
+
+    /**
      * Adds a new controller service of the specified type.
      * 
      * @param {string} controllerServiceType
@@ -334,20 +353,6 @@ nf.Settings = (function () {
      * Initializes the new controller service dialog.
      */
     var initNewControllerServiceDialog = function () {
-        // specify the combo options
-        $('#controller-service-type-filter-options').combo({
-            options: [{
-                    text: 'by type',
-                    value: 'label'
-                }, {
-                    text: 'by tag',
-                    value: 'tags'
-                }],
-            select: function (option) {
-                applyControllerServiceTypeFilter();
-            }
-        });
-
         // specify the controller service availability
         if (nf.Canvas.isClustered()) {
             $('#controller-service-availability-combo').combo({
@@ -365,8 +370,13 @@ nf.Settings = (function () {
         }
 
         // define the function for filtering the list
-        $('#controller-service-type-filter').keyup(function () {
-            applyControllerServiceTypeFilter();
+        $('#controller-service-type-filter').on('keyup', function (e) {
+            var code = e.keyCode ? e.keyCode : e.which;
+            if (code === $.ui.keyCode.ENTER) {
+                addSelectedControllerService();
+            } else {
+                applyControllerServiceTypeFilter();
+            }
         }).focus(function () {
             if ($(this).hasClass(config.styles.filterList)) {
                 $(this).removeClass(config.styles.filterList).val('');
@@ -389,8 +399,7 @@ nf.Settings = (function () {
         });
         controllerServiceTypesData.setItems([]);
         controllerServiceTypesData.setFilterArgs({
-            searchString: getControllerServiceTypeFilterText(),
-            property: $('#controller-service-type-filter-options').combo('getSelectedOption').value
+            searchString: getControllerServiceTypeFilterText()
         });
         controllerServiceTypesData.setFilter(filterControllerServiceTypes);
 
@@ -493,17 +502,7 @@ nf.Settings = (function () {
                     buttonText: 'Add',
                     handler: {
                         click: function () {
-                            var selectedServiceType = $('#selected-controller-service-type').text();
-                            
-                            // ensure something was selected
-                            if (selectedServiceType === '') {
-                                nf.Dialog.showOkDialog({
-                                    dialogContent: 'The type of controller service to create must be selected.',
-                                    overlayBackground: false
-                                });
-                            } else {
-                                addControllerService(selectedServiceType);
-                            }
+                            addSelectedControllerService();
                         }
                     }
                 }, {
@@ -905,14 +904,14 @@ nf.Settings = (function () {
 
             // update the search criteria
             reportingTaskTypesData.setFilterArgs({
-                searchString: getReportingTaskTypeFilterText(),
-                property: $('#reporting-task-type-filter-options').combo('getSelectedOption').value
+                searchString: getReportingTaskTypeFilterText()
             });
-
-            // need to invalidate the entire table since parent elements may need to be 
-            // rerendered due to changes in their children
             reportingTaskTypesData.refresh();
-            reportingTaskTypesGrid.invalidate();
+
+            // update the selection if possible
+            if (reportingTaskTypesData.getLength() > 0) {
+                reportingTaskTypesGrid.setSelectedRows([0]);
+            }
         }
     };
 
@@ -978,6 +977,23 @@ nf.Settings = (function () {
     };
 
     /**
+     * Adds the currently selected reporting task.
+     */
+    var addSelectedReportingTask = function () {
+        var selectedTaskType = $('#selected-reporting-task-type').text();
+
+        // ensure something was selected
+        if (selectedTaskType === '') {
+            nf.Dialog.showOkDialog({
+                dialogContent: 'The type of reporting task to create must be selected.',
+                overlayBackground: false
+            });
+        } else {
+            addReportingTask(selectedTaskType);
+        }
+    };
+    
+    /**
      * Adds a new reporting task of the specified type.
      * 
      * @param {string} reportingTaskType
@@ -1033,20 +1049,6 @@ nf.Settings = (function () {
      * Initializes the new reporting task dialog.
      */
     var initNewReportingTaskDialog = function () {
-        // specify the combo options
-        $('#reporting-task-type-filter-options').combo({
-            options: [{
-                    text: 'by type',
-                    value: 'label'
-                }, {
-                    text: 'by tag',
-                    value: 'tags'
-                }],
-            select: function (option) {
-                applyReportingTaskTypeFilter();
-            }
-        });
-
         // specify the reporting task availability
         if (nf.Canvas.isClustered()) {
             $('#reporting-task-availability-combo').combo({
@@ -1064,8 +1066,13 @@ nf.Settings = (function () {
         }
 
         // define the function for filtering the list
-        $('#reporting-task-type-filter').keyup(function () {
-            applyReportingTaskTypeFilter();
+        $('#reporting-task-type-filter').on('keyup', function (e) {
+            var code = e.keyCode ? e.keyCode : e.which;
+            if (code === $.ui.keyCode.ENTER) {
+                addSelectedReportingTask();
+            } else {
+                applyReportingTaskTypeFilter();
+            }
         }).focus(function () {
             if ($(this).hasClass(config.styles.filterList)) {
                 $(this).removeClass(config.styles.filterList).val('');
@@ -1088,8 +1095,7 @@ nf.Settings = (function () {
         });
         reportingTaskTypesData.setItems([]);
         reportingTaskTypesData.setFilterArgs({
-            searchString: getReportingTaskTypeFilterText(),
-            property: $('#reporting-task-type-filter-options').combo('getSelectedOption').value
+            searchString: getReportingTaskTypeFilterText()
         });
         reportingTaskTypesData.setFilter(filterReportingTaskTypes);
 
@@ -1192,17 +1198,7 @@ nf.Settings = (function () {
                     buttonText: 'Add',
                     handler: {
                         click: function () {
-                            var selectedTaskType = $('#selected-reporting-task-type').text();
-                            
-                            // ensure something was selected
-                            if (selectedTaskType === '') {
-                                nf.Dialog.showOkDialog({
-                                    dialogContent: 'The type of reporting task to create must be selected.',
-                                    overlayBackground: false
-                                });
-                            } else {
-                                addReportingTask(selectedTaskType);
-                            }
+                            addSelectedReportingTask();
                         }
                     }
                 }, {
@@ -1587,6 +1583,7 @@ nf.Settings = (function () {
                     // reset the canvas size after the dialog is shown
                     var controllerServiceTypesGrid = $('#controller-service-types-table').data('gridInstance');
                     if (nf.Common.isDefinedAndNotNull(controllerServiceTypesGrid)) {
+                        controllerServiceTypesGrid.setSelectedRows([0]);
                         controllerServiceTypesGrid.resizeCanvas();
                     }
                     
@@ -1598,6 +1595,7 @@ nf.Settings = (function () {
                     // reset the canvas size after the dialog is shown
                     var reportingTaskTypesGrid = $('#reporting-task-types-table').data('gridInstance');
                     if (nf.Common.isDefinedAndNotNull(reportingTaskTypesGrid)) {
+                        reportingTaskTypesGrid.setSelectedRows([0]);
                         reportingTaskTypesGrid.resizeCanvas();
                     }
                     


[15/41] nifi git commit: NIFI-1024 Correcting references to nifi.cluster.manager.protocol.port

Posted by ma...@apache.org.
NIFI-1024 Correcting references to nifi.cluster.manager.protocol.port


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f6d34279
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f6d34279
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f6d34279

Branch: refs/heads/NIFI-810-InputRequirement
Commit: f6d3427955c48052bc214e08efcb24934d9a4cba
Parents: f798127
Author: Aldrin Piri <al...@apache.org>
Authored: Tue Oct 6 13:00:30 2015 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Oct 9 14:22:00 2015 -0400

----------------------------------------------------------------------
 nifi-docs/src/main/asciidoc/administration-guide.adoc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f6d34279/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 75523bf..fb109c9 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -336,7 +336,7 @@ For the NCM, the minimum properties to configure are as follows:
 * Under the Web Properties, set either the http or https port that you want the NCM to run on. If the NCM and one of the nodes are on the same server, make sure this port is different from the web port used by the node.
 * Under the Cluster Manager Properties, set the following:
 ** nifi.cluster.is.manager - Set this to _true_.
-** nifi.cluster.protocol.manager.port - Set this to an open port that is higher than 1024 (anything lower requires root). Take note of this setting, as you will need to reference it when you set up the nodes.
+** nifi.cluster.manager.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root). Take note of this setting, as you will need to reference it when you set up the nodes.
 
 For Node 1, the minimum properties to configure are as follows:
 
@@ -344,7 +344,7 @@ For Node 1, the minimum properties to configure are as follows:
 * Under Cluster Node Properties, set the following:
 ** nifi.cluster.is.node - Set this to _true_.
 ** nifi.cluster.node.address - Set this to the fully qualified hostname of the node. If left blank, it defaults to "localhost".
-** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root). If Node 1 and the NCM are on the same server, make sure this port is different from the nifi.cluster.protocol.manager.port.
+** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root). If Node 1 and the NCM are on the same server, make sure this port is different from the nifi.cluster.manager.protocol.port.
 ** nifi.cluster.node.unicast.manager.address - Set this to the NCM's fully qualified hostname.  
 ** nifi.cluster.node.unicast.manager.protocol.port - Set this to exactly the same port that was set on the NCM for the property nifi.cluster.manager.protocol.port.
 


[16/41] nifi git commit: Merge remote-tracking branch 'yu/NIFI-774'

Posted by ma...@apache.org.
Merge remote-tracking branch 'yu/NIFI-774'


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fad81d87
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fad81d87
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fad81d87

Branch: refs/heads/NIFI-810-InputRequirement
Commit: fad81d872d51d797bcc681977cfcb5fbda482257
Parents: f6d3427 eb1d6b5
Author: danbress <db...@onyxconsults.com>
Authored: Mon Oct 12 09:55:52 2015 -0400
Committer: danbress <db...@onyxconsults.com>
Committed: Mon Oct 12 09:55:52 2015 -0400

----------------------------------------------------------------------
 .../processors/aws/AbstractAWSProcessor.java    |   2 +-
 .../nifi/processors/aws/s3/DeleteS3Object.java  |  98 +++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/aws/s3/TestDeleteS3Object.java   | 140 +++++++++++++++++++
 4 files changed, 240 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[18/41] nifi git commit: NIFI-1043 fix for erroneous whitespace in maven archetype. This closes #100. Signed off by Tony Kurc

Posted by ma...@apache.org.
NIFI-1043 fix for erroneous whitespace in maven archetype. This closes #100. Signed off by Tony Kurc <tk...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ce7d098a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ce7d098a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ce7d098a

Branch: refs/heads/NIFI-810-InputRequirement
Commit: ce7d098a485d3a3a58f52e269f48be4969e13f9b
Parents: 49ee06b
Author: Wouter de Bie <wo...@spotify.com>
Authored: Sat Oct 17 09:00:18 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 17 09:00:18 2015 -0400

----------------------------------------------------------------------
 .../src/main/java/MyProcessor.java                       | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ce7d098a/nifi-maven-archetypes/nifi-processor-bundle-archetype/src/main/resources/archetype-resources/nifi-__artifactBaseName__-processors/src/main/java/MyProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-maven-archetypes/nifi-processor-bundle-archetype/src/main/resources/archetype-resources/nifi-__artifactBaseName__-processors/src/main/java/MyProcessor.java b/nifi-maven-archetypes/nifi-processor-bundle-archetype/src/main/resources/archetype-resources/nifi-__artifactBaseName__-processors/src/main/java/MyProcessor.java
index 7b70dca..ca5d903 100644
--- a/nifi-maven-archetypes/nifi-processor-bundle-archetype/src/main/resources/archetype-resources/nifi-__artifactBaseName__-processors/src/main/java/MyProcessor.java
+++ b/nifi-maven-archetypes/nifi-processor-bundle-archetype/src/main/resources/archetype-resources/nifi-__artifactBaseName__-processors/src/main/java/MyProcessor.java
@@ -84,13 +84,10 @@ public class MyProcessor extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-		FlowFile flowFile = session.get();
-		if ( flowFile == null ) {
-			return;
-		}
-
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
         // TODO implement
-
     }
-
 }


[05/41] nifi git commit: Add a relation for "not found" and that is transfered if a target key doesn't exist on S3

Posted by ma...@apache.org.
Add a relation for "not found" and that is transfered if a target key doesn't exist on S3


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0334f046
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0334f046
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0334f046

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 0334f04640c7ae0b070214df7e356b2b05a8051c
Parents: 213f507
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Wed Sep 2 13:03:05 2015 +0900
Committer: Yu ISHIKAWA <yu...@gmail.com>
Committed: Wed Sep 2 13:48:39 2015 +0900

----------------------------------------------------------------------
 .../processors/aws/AbstractAWSProcessor.java    |  2 +-
 .../nifi/processors/aws/s3/DeleteS3Object.java  | 30 ++++++++++++++++----
 .../processors/aws/s3/TestDeleteS3Object.java   | 19 +++++++++++--
 3 files changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0334f046/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index a781ff9..e2ae31e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -54,7 +54,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
             .description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
 
-    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+    public static Set<Relationship> relationships = Collections.unmodifiableSet(
             new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
 
     public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/0334f046/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 2cc00db..803a6ab 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -18,14 +18,15 @@ package org.apache.nifi.processors.aws.s3;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteVersionRequest;
-
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -34,6 +35,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
 
@@ -44,6 +46,9 @@ import org.apache.nifi.processor.util.StandardValidators;
         "And the FlowFiles are checked if exists or not before deleting.")
 public class DeleteS3Object extends AbstractS3Processor {
 
+    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
+            .description("FlowFiles are routed to 'not found' if it doesn't exist on Amazon S3").build();
+
     public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
             .name("Version")
             .description("The Version of the Object to delete")
@@ -56,6 +61,14 @@ public class DeleteS3Object extends AbstractS3Processor {
             Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
                     FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
 
+    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_NOT_FOUND)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
@@ -75,12 +88,19 @@ public class DeleteS3Object extends AbstractS3Processor {
         final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
 
         final AmazonS3 s3 = getClient();
+
+        // Checks if the key exists or not
+        // If there is no such a key, then throws a exception
         try {
-            // Checks if the key exists or not
-            // If there is no such a key, then throws a exception
-            s3.getObjectMetadata(bucket, key);
+          s3.getObjectMetadata(bucket, key);
+        } catch (final AmazonServiceException ase) {
+            getLogger().error("Not found sucha a file and folder on Amazon S3 {}", new Object[]{flowFile, ase});
+            session.transfer(flowFile, REL_NOT_FOUND);
+            return;
+        }
 
-            // Deletes a key on Amazon S3
+        // Deletes a key on Amazon S3
+        try {
             if (versionId == null) {
                 final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key);
                 s3.deleteObject(r);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0334f046/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
index cac55e5..04d9e61 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -22,15 +22,21 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import com.amazonaws.auth.PropertiesCredentials;
 import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.*;
-
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CreateBucketRequest;
+import com.amazonaws.services.s3.model.DeleteBucketRequest;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
@@ -122,7 +128,14 @@ public class TestDeleteS3Object {
         runner.enqueue(new byte[0], attrs);
         runner.run(1);
 
-        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_NOT_FOUND, 1);
+    }
+
+    @Test
+    public void testGetRelationships() {
+        DeleteS3Object deleter = new DeleteS3Object();
+        Set<Relationship> relationships = deleter.getRelationships();
+        assertEquals(relationships.size(), 3);
     }
 
     // Uploads a test file


[31/41] nifi git commit: Merge branch 'NIFI-988' of https://github.com/ImpressTV/nifi into NIFI-988

Posted by ma...@apache.org.
Merge branch 'NIFI-988' of https://github.com/ImpressTV/nifi into NIFI-988


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97441ea0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97441ea0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97441ea0

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 97441ea0c23ebdc7c51939ff28ba270012c0fe55
Parents: 22924c6 ee7d89c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 21 10:18:05 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 21 10:18:05 2015 -0400

----------------------------------------------------------------------
 .../standard/PutDistributedMapCache.java        | 244 ++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestPutDistributedMapCache.java    | 280 +++++++++++++++++++
 3 files changed, 525 insertions(+)
----------------------------------------------------------------------



[21/41] nifi git commit: NIFI-1035 AbstractKiteProcessor.getSchema leaks file handles. closes #101. Close file handles when reading schema from resources or paths in AbstractKiteProcessor.getSchema

Posted by ma...@apache.org.
NIFI-1035 AbstractKiteProcessor.getSchema leaks file handles.  closes #101.
Close file handles when reading schema from resources or paths in AbstractKiteProcessor.getSchema

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/943ccfed
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/943ccfed
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/943ccfed

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 943ccfedef0cfd8896814db203646e3e4de8cf8f
Parents: 22924c6
Author: Alan Jackoway <al...@cloudera.com>
Authored: Mon Oct 12 15:30:20 2015 -0700
Committer: joewitt <jo...@apache.org>
Committed: Sun Oct 18 18:56:26 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/kite/AbstractKiteProcessor.java      | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/943ccfed/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
index da1c046..f90c089 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
@@ -121,14 +121,17 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
             if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
                 return Datasets.load(uri).getDataset().getDescriptor().getSchema();
             } else if ("resource".equals(uri.getScheme())) {
-                InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
-                        .openStream();
-                return parseSchema(uri, in);
+                try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
+                        .openStream()) {
+                    return parseSchema(uri, in);
+                }
             } else {
                 // try to open the file
                 Path schemaPath = new Path(uri);
                 FileSystem fs = schemaPath.getFileSystem(conf);
-                return parseSchema(uri, fs.open(schemaPath));
+                try (InputStream in = fs.open(schemaPath)) {
+                    return parseSchema(uri, in);
+                }
             }
 
         } catch (DatasetNotFoundException e) {


[13/41] nifi git commit: NIFI-1030 made InvokeHttp basic auth password a sensitive value

Posted by ma...@apache.org.
NIFI-1030 made InvokeHttp basic auth password a sensitive value

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b4bfcc1f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b4bfcc1f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b4bfcc1f

Branch: refs/heads/NIFI-810-InputRequirement
Commit: b4bfcc1f21fed3209bf4a8f187616cdbb3d1a5c9
Parents: 2583d78
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Thu Oct 8 15:33:16 2015 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Oct 8 15:46:12 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/standard/InvokeHTTP.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b4bfcc1f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index f16eb9c..bf1fb4e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -277,6 +277,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .displayName("Basic Authentication Password")
                 .description("The password to be used by the client to authenticate against the Remote URL.")
                 .required(false)
+                .sensitive(true)
                 .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
                 .build();
 


[38/41] nifi git commit: NIFI-1055: Fixed checkstyle violations

Posted by ma...@apache.org.
NIFI-1055: Fixed checkstyle violations


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0fc5d304
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0fc5d304
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0fc5d304

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 0fc5d3046178836365f710d312cef6568126a99d
Parents: 5d90c9b
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 09:59:24 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 10:08:44 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java |  42 ++++----
 .../processors/kite/TestCSVToAvroProcessor.java |   1 -
 .../nifi/processors/standard/ListenHTTP.java    | 108 +++++++++----------
 .../standard/PutDistributedMapCache.java        |  96 +++++++++--------
 .../standard/TestPutDistributedMapCache.java    |  31 +++---
 5 files changed, 140 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 016750b..f0ba71a 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -50,7 +50,7 @@ import org.apache.nifi.processor.io.StreamCallback;
 
 @SideEffectFree
 @SupportsBatching
-@Tags({ "json", "avro", "binary" })
+@Tags({"json", "avro", "binary"})
 @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
     + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
     + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
@@ -60,41 +60,41 @@ public class ConvertAvroToJSON extends AbstractProcessor {
     protected static final String CONTAINER_ARRAY = "array";
     protected static final String CONTAINER_NONE = "none";
 
-    static final PropertyDescriptor CONTAINER_OPTIONS
-            = new PropertyDescriptor.Builder()
-            .name("JSON container options")
-            .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
-            .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
-            .required(true)
-            .defaultValue(CONTAINER_ARRAY)
-            .build();
+    static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
+        .name("JSON container options")
+        .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE
+            + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
+        .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
+        .required(true)
+        .defaultValue(CONTAINER_ARRAY)
+        .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("A FlowFile is routed to this relationship after it has been converted to JSON")
-            .build();
+        .name("success")
+        .description("A FlowFile is routed to this relationship after it has been converted to JSON")
+        .build();
     static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
-            .build();
+        .name("failure")
+        .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
+        .build();
 
-    
 
     private List<PropertyDescriptor> properties;
-    
+
     @Override
     protected void init(ProcessorInitializationContext context) {
         super.init(context);
-        
+
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(CONTAINER_OPTIONS);
         this.properties = Collections.unmodifiableList(properties);
-    
     }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
     }
+
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> rels = new HashSet<>();
@@ -118,8 +118,8 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                 public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
                     try (final InputStream in = new BufferedInputStream(rawIn);
 
-                         final OutputStream out = new BufferedOutputStream(rawOut);
-                         final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+                        final OutputStream out = new BufferedOutputStream(rawOut);
+                        final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
 
                         final GenericData genericData = GenericData.get();
                         GenericRecord record = reader.next();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 0cde23c..902ec79 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -58,7 +58,6 @@ public class TestCSVToAvroProcessor {
 
     /**
      * Basic test for tab separated files, similar to #test
-     * @throws IOException
      */
     @Test
     public void testTabSeparatedConversion() throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index a446eb6..9ad1703 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -31,10 +31,12 @@ import java.util.regex.Pattern;
 import javax.servlet.Servlet;
 import javax.ws.rs.Path;
 
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import org.apache.nifi.stream.io.StreamThrottler;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -42,15 +44,12 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
 import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
 import org.apache.nifi.ssl.SSLContextService;
-
+import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+import org.apache.nifi.stream.io.StreamThrottler;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -70,56 +69,56 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     private List<PropertyDescriptor> properties;
 
     public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Relationship for successfully received FlowFiles")
-            .build();
+        .name("success")
+        .description("Relationship for successfully received FlowFiles")
+        .build();
 
     public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
-            .name("Base Path")
-            .description("Base path for incoming connections")
-            .required(true)
-            .defaultValue("contentListener")
-            .addValidator(StandardValidators.URI_VALIDATOR)
-            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
-            .build();
+        .name("Base Path")
+        .description("Base path for incoming connections")
+        .required(true)
+        .defaultValue("contentListener")
+        .addValidator(StandardValidators.URI_VALIDATOR)
+        .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+        .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
-            .name("Listening Port")
-            .description("The Port to listen on for incoming connections")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
+        .name("Listening Port")
+        .description("The Port to listen on for incoming connections")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
     public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
-            .name("Authorized DN Pattern")
-            .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
-            .required(true)
-            .defaultValue(".*")
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .build();
+        .name("Authorized DN Pattern")
+        .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
+        .required(true)
+        .defaultValue(".*")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
     public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
-            .name("Max Unconfirmed Flowfile Time")
-            .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
-            .required(true)
-            .defaultValue("60 secs")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
+        .name("Max Unconfirmed Flowfile Time")
+        .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
+        .required(true)
+        .defaultValue("60 secs")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
     public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
-            .name("Max Data to Receive per Second")
-            .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
+        .name("Max Data to Receive per Second")
+        .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
+        .required(false)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("The Controller Service to use in order to obtain an SSL Context")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
+        .name("SSL Context Service")
+        .description("The Controller Service to use in order to obtain an SSL Context")
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .build();
     public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
-            .name("HTTP Headers to receive as Attributes (Regex)")
-            .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .required(false)
-            .build();
+        .name("HTTP Headers to receive as Attributes (Regex)")
+        .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .required(false)
+        .build();
 
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -173,7 +172,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             toShutdown.stop();
             toShutdown.destroy();
         } catch (final Exception ex) {
-            getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex});
+            getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[] {ex});
             this.server = null;
         }
     }
@@ -235,18 +234,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         connector.setPort(port);
 
         // add the connector to the server
-        server.setConnectors(new Connector[]{connector});
+        server.setConnectors(new Connector[] {connector});
 
         final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
         for (final Class<? extends Servlet> cls : getServerClasses()) {
             final Path path = cls.getAnnotation(Path.class);
             // Note: servlets must have a path annotation - this will NPE otherwise
             // also, servlets other than ListenHttpServlet must have a path starting with /
-            if(basePath.isEmpty() && !path.value().isEmpty()){
+            if (basePath.isEmpty() && !path.value().isEmpty()) {
                 // Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
                 contextHandler.addServlet(cls, path.value());
-            }
-            else{
+            } else {
                 contextHandler.addServlet(cls, "/" + basePath + path.value());
             }
         }
@@ -304,7 +302,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         for (final String id : findOldFlowFileIds(context)) {
             final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
             if (wrapper != null) {
-                getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id});
+                getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[] {id});
                 wrapper.session.rollback();
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
index 8e50c9f..bc1fde5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
@@ -16,6 +16,16 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -33,24 +43,22 @@ import org.apache.nifi.distributed.cache.client.exception.SerializationException
 import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-
 @EventDriven
 @SupportsBatching
 @Tags({"map", "cache", "put", "distributed"})
 @CapabilityDescription("Gets the content of a FlowFile and puts it to a distributed map cache, using a cache key " +
-        "computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
-        "'keep original' the entry is not replaced.'")
+    "computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
+    "'keep original' the entry is not replaced.'")
 @WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " +
-        "attribute is true, is the FlowFile is cached, otherwise false.")
+    "attribute is true, is the FlowFile is cached, otherwise false.")
 @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
 public class PutDistributedMapCache extends AbstractProcessor {
 
@@ -58,55 +66,55 @@ public class PutDistributedMapCache extends AbstractProcessor {
 
     // Identifies the distributed map cache client
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
-            .name("Distributed Cache Service")
-            .description("The Controller Service that is used to cache flow files")
-            .required(true)
-            .identifiesControllerService(DistributedMapCacheClient.class)
-            .build();
+        .name("Distributed Cache Service")
+        .description("The Controller Service that is used to cache flow files")
+        .required(true)
+        .identifiesControllerService(DistributedMapCacheClient.class)
+        .build();
 
     // Selects the FlowFile attribute, whose value is used as cache key
     public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
-            .name("Cache Entry Identifier")
-            .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
-                    "be evaluated against a FlowFile in order to determine the cache key")
-            .required(true)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Cache Entry Identifier")
+        .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
+            "be evaluated against a FlowFile in order to determine the cache key")
+        .required(true)
+        .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+        .expressionLanguageSupported(true)
+        .build();
 
     public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue("replace", "Replace if present",
-            "Adds the specified entry to the cache, replacing any value that is currently set.");
+        "Adds the specified entry to the cache, replacing any value that is currently set.");
 
     public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
-            "Adds the specified entry to the cache, if the key does not exist.");
+        "Adds the specified entry to the cache, if the key does not exist.");
 
     public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder()
-            .name("Cache update strategy")
-            .description("Determines how the cache is updated if the cache already contains the entry")
-            .required(true)
-            .allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
-            .defaultValue(CACHE_UPDATE_REPLACE.getValue())
-            .build();
+        .name("Cache update strategy")
+        .description("Determines how the cache is updated if the cache already contains the entry")
+        .required(true)
+        .allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
+        .defaultValue(CACHE_UPDATE_REPLACE.getValue())
+        .build();
 
     public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder()
-            .name("Max cache entry size")
-            .description("The maximum amount of data to put into cache")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .expressionLanguageSupported(false)
-            .build();
+        .name("Max cache entry size")
+        .description("The maximum amount of data to put into cache")
+        .required(false)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .expressionLanguageSupported(false)
+        .build();
 
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
-            .build();
+        .name("success")
+        .description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
+        .build();
 
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
-            .build();
+        .name("failure")
+        .description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
+        .build();
     private final Set<Relationship> relationships;
 
     private final Serializer<String> keySerializer = new StringSerializer();
@@ -207,7 +215,7 @@ public class PutDistributedMapCache extends AbstractProcessor {
         } catch (final IOException e) {
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
-            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
+            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
index 8347e7f..05d4293 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -26,22 +34,11 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.junit.Assert.assertEquals;
 
 public class TestPutDistributedMapCache {
 
     private TestRunner runner;
     private MockCacheClient service;
-    private PutDistributedMapCache processor;
 
     @Before
     public void setup() throws InitializationException {
@@ -57,7 +54,7 @@ public class TestPutDistributedMapCache {
     public void testNoCacheKey() throws InitializationException {
 
         runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
-        runner.enqueue(new byte[]{});
+        runner.enqueue(new byte[] {});
 
         runner.run();
 
@@ -99,7 +96,7 @@ public class TestPutDistributedMapCache {
         props.put("caheKeyAttribute", "2");
 
         // flow file without content
-        runner.enqueue(new byte[]{}, props);
+        runner.enqueue(new byte[] {}, props);
 
         runner.run();
 
@@ -171,7 +168,7 @@ public class TestPutDistributedMapCache {
 
         runner.clearTransferState();
 
-        //we expect that the cache entry is replaced
+        // we expect that the cache entry is replaced
         value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
         assertEquals(replaced, new String(value, "UTF-8"));
     }
@@ -215,7 +212,7 @@ public class TestPutDistributedMapCache {
 
         runner.clearTransferState();
 
-        //we expect that the cache entry is NOT replaced
+        // we expect that the cache entry is NOT replaced
         value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
         assertEquals(original, new String(value, "UTF-8"));
     }
@@ -225,7 +222,7 @@ public class TestPutDistributedMapCache {
         private boolean failOnCalls = false;
 
         private void verifyNotFail() throws IOException {
-            if ( failOnCalls ) {
+            if (failOnCalls) {
                 throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
             }
         }
@@ -240,7 +237,7 @@ public class TestPutDistributedMapCache {
         @Override
         @SuppressWarnings("unchecked")
         public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
-                                          final Deserializer<V> valueDeserializer) throws IOException {
+            final Deserializer<V> valueDeserializer) throws IOException {
             verifyNotFail();
             return (V) values.putIfAbsent(key, value);
         }


[04/41] nifi git commit: Add @Override annotations

Posted by ma...@apache.org.
Add @Override annotations


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/213f507f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/213f507f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/213f507f

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 213f507f53b3afa3eef3398ebdd3006fa1cbe292
Parents: 6cbc6db
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Wed Sep 2 13:02:39 2015 +0900
Committer: Yu ISHIKAWA <yu...@gmail.com>
Committed: Wed Sep 2 13:02:39 2015 +0900

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java     | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/213f507f/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 85fc70e..2cc00db 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -56,10 +56,12 @@ public class DeleteS3Object extends AbstractS3Processor {
             Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
                     FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
 
+    @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
     }
 
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {


[17/41] nifi git commit: NIFI-774 Ignore integration style test

Posted by ma...@apache.org.
NIFI-774 Ignore integration style test


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49ee06b0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49ee06b0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49ee06b0

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 49ee06b0a256b9b4b7550acccb6facceb3c31c21
Parents: fad81d8
Author: danbress <db...@onyxconsults.com>
Authored: Mon Oct 12 19:59:46 2015 -0400
Committer: danbress <db...@onyxconsults.com>
Committed: Mon Oct 12 19:59:46 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/49ee06b0/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
index 603b06a..509a31f 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.amazonaws.auth.PropertiesCredentials;
@@ -38,7 +39,7 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
 
-//@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
+@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
 public class TestDeleteS3Object {
 
     private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";


[20/41] nifi git commit: NIFI-995 TestGetFile.testAttributes() failed in case of a NTFS partition in Linux, because Files.setPosixFilePermissions() did not have any effect on the files and did not throw Exception. closes #95. Signed off by Tony Kurc (tku

Posted by ma...@apache.org.
NIFI-995 TestGetFile.testAttributes() failed in case of a NTFS partition in Linux, because Files.setPosixFilePermissions() did not have any effect on the files and did not throw Exception. closes #95. Signed off by Tony Kurc (tkurc@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/22924c65
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/22924c65
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/22924c65

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 22924c656bfdcf749bd618f055ffa816aca17ae5
Parents: 9a8d763
Author: Joe <jo...@impresstv.com>
Authored: Sat Oct 17 10:46:49 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 17 10:46:49 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/TestGetFile.java   | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/22924c65/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFile.java
index 018cbdc..eb8a764 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFile.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
 import java.text.DateFormat;
 import java.text.ParseException;
@@ -33,6 +34,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.util.MockFlowFile;
@@ -158,8 +160,23 @@ public class TestGetFile {
 
         boolean verifyPermissions = false;
         try {
-            Files.setPosixFilePermissions(targetPath, PosixFilePermissions.fromString("r--r-----"));
-            verifyPermissions = true;
+            // If you mount an NTFS partition in Linux, you are unable to change the permissions of the files,
+            // because every file has the same permissions, controlled by the 'fmask' and 'dmask' mount options.
+            // Executing a chmod command will not fail, but it does not change the file's permissions.
+            // From Java perspective the NTFS mount point, as a FileStore supports the 'unix' and 'posix' file
+            // attribute views, but the setPosixFilePermissions() has no effect.
+            //
+            // If you set verifyPermissions to true without the following extra check, the test case will fail
+            // on a file system, where Nifi source is located on a NTFS mount point in Linux.
+            // The purpose of the extra check is to ensure, that setPosixFilePermissions() changes the file's
+            // permissions, and set verifyPermissions, after we are convinced.
+            Set<PosixFilePermission> perms = PosixFilePermissions.fromString("r--r-----");
+            Files.setPosixFilePermissions(targetPath, perms);
+            Set<PosixFilePermission> permsAfterSet =  Files.getPosixFilePermissions(targetPath);
+            if (perms.equals(permsAfterSet)) {
+               verifyPermissions = true;
+            }
+
         } catch (Exception donothing) {
         }
 


[08/41] nifi git commit: Modify an error message

Posted by ma...@apache.org.
Modify an error message


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f718b4bf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f718b4bf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f718b4bf

Branch: refs/heads/NIFI-810-InputRequirement
Commit: f718b4bf46f441fde2f02065b6d55ac294fe60ce
Parents: d32a32a
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Mon Sep 7 23:34:22 2015 +0900
Committer: Yu ISHIKAWA <yu...@gmail.com>
Committed: Mon Sep 7 23:34:22 2015 +0900

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f718b4bf/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
index 082a80d..603b06a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -56,7 +56,7 @@ public class TestDeleteS3Object {
             CreateBucketRequest request = new CreateBucketRequest(TEST_BUCKET, TEST_REGION);
             client.createBucket(request);
         } catch (final AmazonS3Exception e) {
-            System.out.println(TEST_BUCKET + " already exists.");
+            System.out.println("Can't create the key " + TEST_BUCKET + ":" + e.toString());
         } catch (final IOException e) {
             System.out.println(CREDENTIALS_FILE + " doesn't exist.");
         }


[41/41] nifi git commit: NIFI-810: Merged master into branch

Posted by ma...@apache.org.
NIFI-810: Merged master into branch


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0636f0e7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0636f0e7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0636f0e7

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 0636f0e731cd28299edd3a6e9db90de5045ab662
Parents: 8e2308b d63cd6b
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 25 11:02:40 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:02:40 2015 -0400

----------------------------------------------------------------------
 .../src/main/asciidoc/administration-guide.adoc |   4 +-
 .../src/main/java/MyProcessor.java              |  11 +-
 .../nifi/processors/avro/ConvertAvroToJSON.java |  67 ++++-
 .../processors/avro/TestConvertAvroToJSON.java  |  47 ++-
 .../processors/aws/AbstractAWSProcessor.java    |   2 +-
 .../nifi/processors/aws/s3/DeleteS3Object.java  |  98 ++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/aws/s3/TestDeleteS3Object.java   | 141 +++++++++
 .../nifi/controller/FlowUnmarshaller.java       |  77 -----
 .../src/main/resources/FlowConfiguration.xsd    |   2 +-
 .../src/main/resources/bin/nifi.sh              |  96 +++---
 .../canvas/new-controller-service-dialog.jsp    |   1 -
 .../partials/canvas/new-processor-dialog.jsp    |   1 -
 .../canvas/new-reporting-task-dialog.jsp        |   1 -
 .../css/new-controller-service-dialog.css       |   9 -
 .../main/webapp/css/new-processor-dialog.css    |   9 -
 .../webapp/css/new-reporting-task-dialog.css    |   9 -
 .../webapp/js/nf/canvas/nf-canvas-toolbox.js    |  60 ++--
 .../src/main/webapp/js/nf/canvas/nf-settings.js | 140 +++++----
 .../processors/kite/AbstractKiteProcessor.java  |  11 +-
 .../nifi/processors/kite/ConvertCSVToAvro.java  | 296 ++++++++++---------
 .../processors/kite/TestCSVToAvroProcessor.java |  39 +++
 .../nifi-standard-prioritizers/pom.xml          |   4 +
 .../PriorityAttributePrioritizer.java           |   7 +-
 .../PriorityAttributePrioritizerTest.java       |  17 +-
 .../nifi-standard-processors/pom.xml            |   9 +
 .../nifi/processors/standard/ExecuteSQL.java    |   9 +-
 .../nifi/processors/standard/InvokeHTTP.java    |   1 +
 .../nifi/processors/standard/ListenHTTP.java    | 105 ++++---
 .../standard/PutDistributedMapCache.java        | 252 ++++++++++++++++
 .../servlets/ContentAcknowledgmentServlet.java  |   3 +-
 .../standard/servlets/ListenHTTPServlet.java    |   8 +-
 .../processors/standard/util/JdbcCommon.java    |  70 ++++-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/standard/TestGetFile.java   |  21 +-
 .../standard/TestPutDistributedMapCache.java    | 277 +++++++++++++++++
 .../standard/util/TestJdbcCommon.java           |  42 +++
 .../standard/util/TestJdbcTypesDerby.java       | 133 +++++++++
 .../standard/util/TestJdbcTypesH2.java          | 149 ++++++++++
 pom.xml                                         |   2 +-
 40 files changed, 1725 insertions(+), 507 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index b214427,f0ba71a..f0f1630
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@@ -35,7 -36,8 +38,7 @@@ import org.apache.nifi.annotation.behav
  import org.apache.nifi.annotation.behavior.SupportsBatching;
  import org.apache.nifi.annotation.behavior.WritesAttribute;
  import org.apache.nifi.annotation.documentation.CapabilityDescription;
--import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.components.PropertyDescriptor;
  import org.apache.nifi.flowfile.FlowFile;
  import org.apache.nifi.flowfile.attributes.CoreAttributes;
  import org.apache.nifi.processor.AbstractProcessor;
@@@ -47,8 -50,7 +51,7 @@@ import org.apache.nifi.processor.io.Str
  
  @SideEffectFree
  @SupportsBatching
- @Tags({ "json", "avro", "binary" })
 -@Tags({"json", "avro", "binary"})
 +@InputRequirement(Requirement.INPUT_REQUIRED)
  @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
      + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
      + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 6f126aa,ea84daa..43b33ff
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@@ -30,8 -30,7 +30,9 @@@ import org.apache.avro.Schema
  import org.apache.avro.file.CodecFactory;
  import org.apache.avro.file.DataFileWriter;
  import org.apache.avro.generic.GenericData.Record;
+ import org.apache.commons.lang3.StringEscapeUtils;
 +import org.apache.nifi.annotation.behavior.InputRequirement;
 +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
  import org.apache.nifi.annotation.documentation.CapabilityDescription;
  import org.apache.nifi.annotation.documentation.Tags;
  import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@@ -68,114 -66,118 +69,108 @@@ public class ConvertCSVToAvro extends A
  
      private static final Validator CHAR_VALIDATOR = new Validator() {
          @Override
--        public ValidationResult validate(String subject, String input,
--                ValidationContext context) {
++        public ValidationResult validate(String subject, String input, ValidationContext context) {
+             // Allows special, escaped characters as input, which is then unescaped and converted to a single character.
+             // Examples for special characters: \t (or \u0009), \f.
+             input = unescapeString(input);
+ 
              return new ValidationResult.Builder()
--                    .subject(subject)
--                    .input(input)
-                     .explanation("Only single characters are supported")
-                     .valid(input.length() == 1)
 -                    .explanation("Only non-null single characters are supported")
 -                    .valid(input.length() == 1 && input.charAt(0) != 0)
--                    .build();
++                .subject(subject)
++                .input(input)
++                .explanation("Only non-null single characters are supported")
++                .valid(input.length() == 1 && input.charAt(0) != 0)
++                .build();
          }
      };
  
      private static final Relationship SUCCESS = new Relationship.Builder()
--            .name("success")
--            .description("Avro content that was converted successfully from CSV")
--            .build();
++        .name("success")
++        .description("Avro content that was converted successfully from CSV")
++        .build();
  
      private static final Relationship FAILURE = new Relationship.Builder()
--            .name("failure")
--            .description("CSV content that could not be processed")
--            .build();
++        .name("failure")
++        .description("CSV content that could not be processed")
++        .build();
  
      private static final Relationship INCOMPATIBLE = new Relationship.Builder()
--            .name("incompatible")
--            .description("CSV content that could not be converted")
--            .build();
++        .name("incompatible")
++        .description("CSV content that could not be converted")
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor SCHEMA
--            = new PropertyDescriptor.Builder()
--            .name("Record schema")
--            .description("Outgoing Avro schema for each record created from a CSV row")
--            .addValidator(SCHEMA_VALIDATOR)
--            .expressionLanguageSupported(true)
--            .required(true)
--            .build();
++    static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
++        .name("Record schema")
++        .description("Outgoing Avro schema for each record created from a CSV row")
++        .addValidator(SCHEMA_VALIDATOR)
++        .expressionLanguageSupported(true)
++        .required(true)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor CHARSET
--            = new PropertyDescriptor.Builder()
--            .name("CSV charset")
--            .description("Character set for CSV files")
--            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
--            .defaultValue(DEFAULTS.charset)
--            .build();
++    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
++        .name("CSV charset")
++        .description("Character set for CSV files")
++        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
++        .defaultValue(DEFAULTS.charset)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor DELIMITER
--            = new PropertyDescriptor.Builder()
--            .name("CSV delimiter")
--            .description("Delimiter character for CSV records")
--            .addValidator(CHAR_VALIDATOR)
--            .defaultValue(DEFAULTS.delimiter)
--            .build();
++    static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
++        .name("CSV delimiter")
++        .description("Delimiter character for CSV records")
++        .addValidator(CHAR_VALIDATOR)
++        .defaultValue(DEFAULTS.delimiter)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor QUOTE
--            = new PropertyDescriptor.Builder()
--            .name("CSV quote character")
--            .description("Quote character for CSV values")
--            .addValidator(CHAR_VALIDATOR)
--            .defaultValue(DEFAULTS.quote)
--            .build();
++    static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder()
++        .name("CSV quote character")
++        .description("Quote character for CSV values")
++        .addValidator(CHAR_VALIDATOR)
++        .defaultValue(DEFAULTS.quote)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor ESCAPE
--            = new PropertyDescriptor.Builder()
--            .name("CSV escape character")
--            .description("Escape character for CSV values")
--            .addValidator(CHAR_VALIDATOR)
--            .defaultValue(DEFAULTS.escape)
--            .build();
++    static final PropertyDescriptor ESCAPE = new PropertyDescriptor.Builder()
++        .name("CSV escape character")
++        .description("Escape character for CSV values")
++        .addValidator(CHAR_VALIDATOR)
++        .defaultValue(DEFAULTS.escape)
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor HAS_HEADER
--            = new PropertyDescriptor.Builder()
--            .name("Use CSV header line")
--            .description("Whether to use the first line as a header")
--            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
--            .defaultValue(String.valueOf(DEFAULTS.useHeader))
--            .build();
++    static final PropertyDescriptor HAS_HEADER = new PropertyDescriptor.Builder()
++        .name("Use CSV header line")
++        .description("Whether to use the first line as a header")
++        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
++        .defaultValue(String.valueOf(DEFAULTS.useHeader))
++        .build();
  
      @VisibleForTesting
--    static final PropertyDescriptor LINES_TO_SKIP
--            = new PropertyDescriptor.Builder()
--            .name("Lines to skip")
--            .description("Number of lines to skip before reading header or data")
--            .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
--            .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
--            .build();
--
--    private static final List<PropertyDescriptor> PROPERTIES
--            = ImmutableList.<PropertyDescriptor>builder()
--            .addAll(AbstractKiteProcessor.getProperties())
--            .add(SCHEMA)
--            .add(CHARSET)
--            .add(DELIMITER)
--            .add(QUOTE)
--            .add(ESCAPE)
--            .add(HAS_HEADER)
--            .add(LINES_TO_SKIP)
--            .build();
--
--    private static final Set<Relationship> RELATIONSHIPS
--            = ImmutableSet.<Relationship>builder()
--            .add(SUCCESS)
--            .add(FAILURE)
--            .add(INCOMPATIBLE)
--            .build();
++    static final PropertyDescriptor LINES_TO_SKIP = new PropertyDescriptor.Builder()
++        .name("Lines to skip")
++        .description("Number of lines to skip before reading header or data")
++        .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
++        .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
++        .build();
++
++    private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor> builder()
++        .addAll(AbstractKiteProcessor.getProperties())
++        .add(SCHEMA)
++        .add(CHARSET)
++        .add(DELIMITER)
++        .add(QUOTE)
++        .add(ESCAPE)
++        .add(HAS_HEADER)
++        .add(LINES_TO_SKIP)
++        .build();
++
++    private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder()
++        .add(SUCCESS)
++        .add(FAILURE)
++        .add(INCOMPATIBLE)
++        .build();
  
      // Immutable configuration
      @VisibleForTesting
@@@ -196,26 -198,26 +191,26 @@@
          super.setDefaultConfiguration(context);
  
          this.props = new CSVProperties.Builder()
--                .charset(context.getProperty(CHARSET).getValue())
--                .delimiter(context.getProperty(DELIMITER).getValue())
--                .quote(context.getProperty(QUOTE).getValue())
--                .escape(context.getProperty(ESCAPE).getValue())
--                .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
--                .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
--                .build();
++            .charset(context.getProperty(CHARSET).getValue())
++            .delimiter(context.getProperty(DELIMITER).getValue())
++            .quote(context.getProperty(QUOTE).getValue())
++            .escape(context.getProperty(ESCAPE).getValue())
++            .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
++            .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
++            .build();
      }
  
      @Override
      public void onTrigger(ProcessContext context, final ProcessSession session)
--            throws ProcessException {
++        throws ProcessException {
          FlowFile incomingCSV = session.get();
          if (incomingCSV == null) {
              return;
          }
  
          String schemaProperty = context.getProperty(SCHEMA)
--                .evaluateAttributeExpressions(incomingCSV)
--                .getValue();
++            .evaluateAttributeExpressions(incomingCSV)
++            .getValue();
          final Schema schema;
          try {
              schema = getSchema(schemaProperty, DefaultConfiguration.get());
@@@ -225,78 -227,85 +220,87 @@@
              return;
          }
  
--        final DataFileWriter<Record> writer = new DataFileWriter<>(
--                AvroUtil.newDatumWriter(schema, Record.class));
--        writer.setCodec(CodecFactory.snappyCodec());
++        try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) {
++            writer.setCodec(CodecFactory.snappyCodec());
  
--        try {
--            final LongHolder written = new LongHolder(0L);
--            final FailureTracker failures = new FailureTracker();
--
--            FlowFile badRecords = session.clone(incomingCSV);
--            FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() {
--                @Override
--                public void process(InputStream in, OutputStream out) throws IOException {
--                    try (CSVFileReader<Record> reader = new CSVFileReader<>(
++            try {
++                final LongHolder written = new LongHolder(0L);
++                final FailureTracker failures = new FailureTracker();
++
++                FlowFile badRecords = session.clone(incomingCSV);
++                FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() {
++                    @Override
++                    public void process(InputStream in, OutputStream out) throws IOException {
++                        try (CSVFileReader<Record> reader = new CSVFileReader<>(
                              in, props, schema, Record.class)) {
--                        reader.initialize();
--                        try (DataFileWriter<Record> w = writer.create(schema, out)) {
--                            while (reader.hasNext()) {
--                                try {
--                                    Record record = reader.next();
--                                    w.append(record);
--                                    written.incrementAndGet();
--                                } catch (DatasetRecordException e) {
--                                    failures.add(e);
++                            reader.initialize();
++                            try (DataFileWriter<Record> w = writer.create(schema, out)) {
++                                while (reader.hasNext()) {
++                                    try {
++                                        Record record = reader.next();
++                                        w.append(record);
++                                        written.incrementAndGet();
++                                    } catch (DatasetRecordException e) {
++                                        failures.add(e);
++                                    }
                                  }
                              }
                          }
                      }
--                }
--            });
++                });
  
--            long errors = failures.count();
++                long errors = failures.count();
  
--            session.adjustCounter("Converted records", written.get(),
++                session.adjustCounter("Converted records", written.get(),
                      false /* update only if file transfer is successful */);
--            session.adjustCounter("Conversion errors", errors,
++                session.adjustCounter("Conversion errors", errors,
                      false /* update only if file transfer is successful */);
  
--            if (written.get() > 0L) {
--                session.transfer(outgoingAvro, SUCCESS);
++                if (written.get() > 0L) {
++                    session.transfer(outgoingAvro, SUCCESS);
  
--                if (errors > 0L) {
--                    getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
--                            new Object[] { errors, errors + written.get() });
--                    badRecords = session.putAttribute(
++                    if (errors > 0L) {
++                        getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
++                            new Object[] {errors, errors + written.get()});
++                        badRecords = session.putAttribute(
                              badRecords, "errors", failures.summary());
--                    session.transfer(badRecords, INCOMPATIBLE);
--                } else {
--                    session.remove(badRecords);
--                }
++                        session.transfer(badRecords, INCOMPATIBLE);
++                    } else {
++                        session.remove(badRecords);
++                    }
  
--            } else {
--                session.remove(outgoingAvro);
++                } else {
++                    session.remove(outgoingAvro);
  
--                if (errors > 0L) {
--                    getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
--                            new Object[] { errors, errors });
--                    badRecords = session.putAttribute(
++                    if (errors > 0L) {
++                        getLogger().warn("Failed to convert {}/{} records from CSV to Avro",
++                            new Object[] {errors, errors});
++                        badRecords = session.putAttribute(
                              badRecords, "errors", failures.summary());
--                } else {
--                    badRecords = session.putAttribute(
++                    } else {
++                        badRecords = session.putAttribute(
                              badRecords, "errors", "No incoming records");
++                    }
++
++                    session.transfer(badRecords, FAILURE);
                  }
  
--                session.transfer(badRecords, FAILURE);
++            } catch (ProcessException | DatasetIOException e) {
++                getLogger().error("Failed reading or writing", e);
++                session.transfer(incomingCSV, FAILURE);
++            } catch (DatasetException e) {
++                getLogger().error("Failed to read FlowFile", e);
++                session.transfer(incomingCSV, FAILURE);
              }
 -
 -        } catch (ProcessException | DatasetIOException e) {
 -            getLogger().error("Failed reading or writing", e);
 -            session.transfer(incomingCSV, FAILURE);
 -        } catch (DatasetException e) {
 -            getLogger().error("Failed to read FlowFile", e);
 -            session.transfer(incomingCSV, FAILURE);
++        } catch (final IOException ioe) {
++            throw new RuntimeException("Unable to close Avro Writer", ioe);
+         }
+     }
  
-         } catch (ProcessException | DatasetIOException e) {
-             getLogger().error("Failed reading or writing", e);
-             session.transfer(incomingCSV, FAILURE);
-         } catch (DatasetException e) {
-             getLogger().error("Failed to read FlowFile", e);
-             session.transfer(incomingCSV, FAILURE);
+     private static String unescapeString(String input) {
+         if (input.length() > 1) {
+             input = StringEscapeUtils.unescapeJava(input);
          }
+         return input;
      }
  }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 258e122,9ad1703..88b6666
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@@ -63,9 -61,8 +63,9 @@@ import org.eclipse.jetty.servlet.Servle
  import org.eclipse.jetty.util.ssl.SslContextFactory;
  import org.eclipse.jetty.util.thread.QueuedThreadPool;
  
 +@InputRequirement(Requirement.INPUT_FORBIDDEN)
  @Tags({"ingest", "http", "https", "rest", "listen"})
- @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
+ @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
  public class ListenHTTP extends AbstractSessionFactoryProcessor {
  
      private Set<Relationship> relationships;


[03/41] nifi git commit: Remove `getSupportedDynamicPropertyDescriptor`

Posted by ma...@apache.org.
Remove `getSupportedDynamicPropertyDescriptor`


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6cbc6db8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6cbc6db8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6cbc6db8

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 6cbc6db860d730954ce8cadc56b6d8446fe085d8
Parents: 7e68349
Author: Yuu ISHIKAWA <yu...@gmail.com>
Authored: Wed Sep 2 06:49:46 2015 +0900
Committer: Yuu ISHIKAWA <yu...@gmail.com>
Committed: Wed Sep 2 06:49:46 2015 +0900

----------------------------------------------------------------------
 .../org/apache/nifi/processors/aws/s3/DeleteS3Object.java   | 9 ---------
 1 file changed, 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc6db8/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 3be7a15..85fc70e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -60,15 +60,6 @@ public class DeleteS3Object extends AbstractS3Processor {
         return properties;
     }
 
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .expressionLanguageSupported(true)
-                .dynamic(true)
-                .build();
-    }
-
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {


[40/41] nifi git commit: Merge branch 'NIFI-1010'

Posted by ma...@apache.org.
Merge branch 'NIFI-1010'


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d63cd6bd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d63cd6bd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d63cd6bd

Branch: refs/heads/NIFI-810-InputRequirement
Commit: d63cd6bd2f9adc3dfc4c7fe168e38081a5b13564
Parents: 0fc5d30 88fc8d2
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 14:39:16 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 14:39:16 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/processors/standard/util/JdbcCommon.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d63cd6bd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------


[23/41] nifi git commit: NIFI-1016: Use centralized priority attribute name in PriorityAttributePrioritizer. closes #98.

Posted by ma...@apache.org.
NIFI-1016: Use centralized priority attribute name in PriorityAttributePrioritizer. closes #98.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ad73a23a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ad73a23a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ad73a23a

Branch: refs/heads/NIFI-810-InputRequirement
Commit: ad73a23affe32e77a5295c829a4958da440d24cb
Parents: bd47f36
Author: Joe <jo...@impresstv.com>
Authored: Thu Oct 1 14:14:37 2015 +0200
Committer: joewitt <jo...@apache.org>
Committed: Sun Oct 18 19:42:46 2015 -0400

----------------------------------------------------------------------
 .../nifi-standard-bundle/nifi-standard-prioritizers/pom.xml   | 4 ++++
 .../apache/nifi/prioritizer/PriorityAttributePrioritizer.java | 7 +++----
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ad73a23a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/pom.xml
index 598c0db..ede0d13 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/pom.xml
@@ -29,6 +29,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/ad73a23a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java
index 75470c8..3d27930 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizer.java
@@ -20,6 +20,7 @@ import java.util.regex.Pattern;
 
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 
 /**
  * This prioritizer checks each FlowFile for a "priority" attribute and lets
@@ -33,8 +34,6 @@ import org.apache.nifi.flowfile.FlowFilePrioritizer;
  */
 public class PriorityAttributePrioritizer implements FlowFilePrioritizer {
 
-    public static final String PRIORITY_ATTR = "priority";
-
     private static final Pattern intPattern = Pattern.compile("-?\\d+");
 
     @Override
@@ -47,8 +46,8 @@ public class PriorityAttributePrioritizer implements FlowFilePrioritizer {
             return 1;
         }
 
-        String o1Priority = o1.getAttribute(PRIORITY_ATTR);
-        String o2Priority = o2.getAttribute(PRIORITY_ATTR);
+        String o1Priority = o1.getAttribute(CoreAttributes.PRIORITY.key());
+        String o2Priority = o2.getAttribute(CoreAttributes.PRIORITY.key());
         if (o1Priority == null && o2Priority == null) {
             return -1; // this is not 0 to match FirstInFirstOut
         } else if (o2Priority == null) {


[11/41] nifi git commit: NIFI-988: Test cases for PutDistributedMapCache

Posted by ma...@apache.org.
NIFI-988: Test cases for PutDistributedMapCache


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ee7d89cb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ee7d89cb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ee7d89cb

Branch: refs/heads/NIFI-810-InputRequirement
Commit: ee7d89cb01d4661cfff2c4f0d093e38758680a56
Parents: 6b1328f
Author: Joe <jo...@impresstv.com>
Authored: Wed Sep 23 14:32:37 2015 +0200
Committer: Joe <jo...@impresstv.com>
Committed: Wed Sep 23 14:32:37 2015 +0200

----------------------------------------------------------------------
 .../standard/TestPutDistributedMapCache.java    | 280 +++++++++++++++++++
 1 file changed, 280 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ee7d89cb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
new file mode 100644
index 0000000..8347e7f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestPutDistributedMapCache {
+
+    private TestRunner runner;
+    private MockCacheClient service;
+    private PutDistributedMapCache processor;
+
+    @Before
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(PutDistributedMapCache.class);
+
+        service = new MockCacheClient();
+        runner.addControllerService("service", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutDistributedMapCache.DISTRIBUTED_CACHE_SERVICE, "service");
+    }
+
+    @Test
+    public void testNoCacheKey() throws InitializationException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
+        runner.enqueue(new byte[]{});
+
+        runner.run();
+
+        // no cache key attribute
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testSingleFlowFile() throws InitializationException, IOException {
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "1");
+
+        String flowFileContent = "content";
+        runner.enqueue(flowFileContent.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+        byte[] value = service.get("1", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(flowFileContent, new String(value, "UTF-8"));
+
+        final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(flowFileContent);
+        runner.clearTransferState();
+
+    }
+
+    @Test
+    public void testNothingToCache() throws InitializationException, IOException {
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "2");
+
+        // flow file without content
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testMaxCacheEntrySize() throws InitializationException, IOException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${uuid}");
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_MAX_BYTES, "10 B");
+
+        // max length is 10 bytes, flow file content is 20 bytes
+        String flowFileContent = "contentwhichistoobig";
+        runner.enqueue(flowFileContent.getBytes("UTF-8"));
+
+        runner.run();
+
+        // no cache key attribute
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+
+        final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_FAILURE).get(0);
+        outputFlowFile.assertAttributeNotExists("cached");
+        outputFlowFile.assertContentEquals(flowFileContent);
+
+
+        runner.clearTransferState();
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_MAX_BYTES, "1 MB");
+    }
+
+    @Test
+    public void testCacheStrategyReplace() throws InitializationException, IOException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
+        runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_REPLACE.getValue());
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "replaceme");
+
+        String original = "original";
+        runner.enqueue(original.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+
+        MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(original);
+
+        runner.clearTransferState();
+        byte[] value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(original, new String(value, "UTF-8"));
+
+        String replaced = "replaced";
+        runner.enqueue(replaced.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+
+        outputFlowFile = runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(replaced);
+
+        runner.clearTransferState();
+
+        //we expect that the cache entry is replaced
+        value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(replaced, new String(value, "UTF-8"));
+    }
+
+    @Test
+    public void testCacheStrategyKeepOriginal() throws InitializationException, IOException {
+
+        runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
+        runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_KEEP_ORIGINAL.getValue());
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("caheKeyAttribute", "replaceme");
+
+        String original = "original";
+        runner.enqueue(original.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_SUCCESS, 1);
+
+        MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "true");
+        outputFlowFile.assertContentEquals(original);
+
+        runner.clearTransferState();
+        byte[] value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(original, new String(value, "UTF-8"));
+
+        String replaced = "replaced";
+        runner.enqueue(replaced.getBytes("UTF-8"), props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDistributedMapCache.REL_FAILURE, 1);
+
+        outputFlowFile = runner.getFlowFilesForRelationship(PutDistributedMapCache.REL_FAILURE).get(0);
+        outputFlowFile.assertAttributeEquals("cached", "false");
+        outputFlowFile.assertContentEquals(replaced);
+
+        runner.clearTransferState();
+
+        //we expect that the cache entry is NOT replaced
+        value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
+        assertEquals(original, new String(value, "UTF-8"));
+    }
+
+    private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
+        private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
+        private boolean failOnCalls = false;
+
+        private void verifyNotFail() throws IOException {
+            if ( failOnCalls ) {
+                throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
+            }
+        }
+
+        @Override
+        public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            final Object retValue = values.putIfAbsent(key, value);
+            return (retValue == null);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+                                          final Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.putIfAbsent(key, value);
+        }
+
+        @Override
+        public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+            verifyNotFail();
+            return values.containsKey(key);
+        }
+
+        @Override
+        public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            values.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+            verifyNotFail();
+            values.remove(key);
+            return true;
+        }
+    }
+
+
+}
\ No newline at end of file


[26/41] nifi git commit: NIFI-1046 shell clean up: remove backticks from nifi.sh. Reviewed by Tony Kurc (tkurc@apache.org)

Posted by ma...@apache.org.
NIFI-1046 shell clean up: remove backticks from nifi.sh. Reviewed by Tony Kurc (tkurc@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58d9b025
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58d9b025
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58d9b025

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 58d9b0251734f124dae40443a295821ca2f653b3
Parents: 9200542
Author: Alex Moundalexis <al...@cloudera.com>
Authored: Mon Oct 19 10:42:28 2015 -0700
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 19 20:33:52 2015 -0400

----------------------------------------------------------------------
 .../src/main/resources/bin/nifi.sh              | 22 ++++++++++----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/58d9b025/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index 2d60d5a..f8d90c4 100755
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -21,8 +21,8 @@
 
 # Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
 
-NIFI_HOME=`cd $(dirname "$0") && cd .. && pwd`
-PROGNAME=`basename "$0"`
+NIFI_HOME=$(cd $(dirname "$0") && cd .. && pwd)
+PROGNAME=$(basename "$0")
 
 
 warn() {
@@ -40,7 +40,7 @@ detectOS() {
     aix=false;
     os400=false;
     darwin=false;
-    case "`uname`" in
+    case "$(uname)" in
         CYGWIN*)
             cygwin=true
             ;;
@@ -69,7 +69,7 @@ unlimitFD() {
 
     # Increase the maximum file descriptors if we can
     if [ "$os400" = "false" ] && [ "$cygwin" = "false" ]; then
-        MAX_FD_LIMIT=`ulimit -H -n`
+        MAX_FD_LIMIT=$(ulimit -H -n)
         if [ "$MAX_FD_LIMIT" != 'unlimited' ]; then
             if [ $? -eq 0 ]; then
                 if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ]; then
@@ -94,12 +94,12 @@ unlimitFD() {
 locateJava() {
     # Setup the Java Virtual Machine
     if $cygwin ; then
-        [ -n "$JAVA" ] && JAVA=`cygpath --unix "$JAVA"`
-        [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+        [ -n "$JAVA" ] && JAVA=$(cygpath --unix "$JAVA")
+        [ -n "$JAVA_HOME" ] && JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
     fi
 
     if [ "x$JAVA" = "x" ] && [ -r /etc/gentoo-release ] ; then
-        JAVA_HOME=`java-config --jre-home`
+        JAVA_HOME=$(java-config --jre-home)
     fi
     if [ "x$JAVA" = "x" ]; then
         if [ "x$JAVA_HOME" != "x" ]; then
@@ -109,8 +109,8 @@ locateJava() {
             JAVA="$JAVA_HOME/bin/java"
         else
             warn "JAVA_HOME not set; results may vary"
-            JAVA=`type java`
-            JAVA=`expr "$JAVA" : '.* \(/.*\)$'`
+            JAVA=$(type java)
+            JAVA=$(expr "$JAVA" : '.* \(/.*\)$')
             if [ "x$JAVA" = "x" ]; then
                 die "java command not found"
             fi
@@ -160,8 +160,8 @@ run() {
             exit 1
         fi;
 
-        NIFI_HOME=`cygpath --path --windows "$NIFI_HOME"`
-        BOOTSTRAP_CONF=`cygpath --path --windows "$BOOTSTRAP_CONF"`
+        NIFI_HOME=$(cygpath --path --windows "$NIFI_HOME")
+        BOOTSTRAP_CONF=$(cygpath --path --windows "$BOOTSTRAP_CONF")
     else
         if [ -n "$run_as" ]; then
             if id -u "$run_as" >/dev/null 2>&1; then


[02/41] nifi git commit: Ignore the test suite

Posted by ma...@apache.org.
Ignore the test suite


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7e683493
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7e683493
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7e683493

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 7e6834937b01f035666b113a6c2735e4156ea4f3
Parents: d1dbd37
Author: Yuu ISHIKAWA <yu...@gmail.com>
Authored: Tue Sep 1 22:36:50 2015 +0900
Committer: Yuu ISHIKAWA <yu...@gmail.com>
Committed: Tue Sep 1 22:36:50 2015 +0900

----------------------------------------------------------------------
 .../org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7e683493/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
index dfe6edb..cac55e5 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -25,17 +25,17 @@ import java.util.Map;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.amazonaws.auth.PropertiesCredentials;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.*;
+
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
 
-@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
+//@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
 public class TestDeleteS3Object {
 
     private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";


[28/41] nifi git commit: NIFI-1046 shell clean up: +variable braces for consistency. Reviewed by Tony Kurc (tkurc@apache.org). This closes #106

Posted by ma...@apache.org.
NIFI-1046 shell clean up: +variable braces for consistency. Reviewed by Tony Kurc (tkurc@apache.org). This closes #106


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b8090311
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b8090311
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b8090311

Branch: refs/heads/NIFI-810-InputRequirement
Commit: b809031195405c04ec7a950c293482225f64e8dc
Parents: 9aa716b
Author: Alex Moundalexis <al...@cloudera.com>
Authored: Mon Oct 19 11:14:32 2015 -0700
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 19 20:36:58 2015 -0400

----------------------------------------------------------------------
 .../src/main/resources/bin/nifi.sh              | 76 ++++++++++----------
 1 file changed, 38 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b8090311/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index 1958e34..8863982 100755
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -57,35 +57,35 @@ detectOS() {
                 ;;
     esac
     # For AIX, set an environment variable
-    if $aix; then
+    if ${aix}; then
          export LDR_CNTRL=MAXDATA=0xB0000000@DSA
-         echo $LDR_CNTRL
+         echo ${LDR_CNTRL}
     fi
 }
 
 unlimitFD() {
     # Use the maximum available, or set MAX_FD != -1 to use that
-    if [ "x$MAX_FD" = "x" ]; then
+    if [ "x${MAX_FD}" = "x" ]; then
         MAX_FD="maximum"
     fi
 
     # Increase the maximum file descriptors if we can
-    if [ "$os400" = "false" ] && [ "$cygwin" = "false" ]; then
+    if [ "${os400}" = "false" ] && [ "${cygwin}" = "false" ]; then
         MAX_FD_LIMIT=$(ulimit -H -n)
-        if [ "$MAX_FD_LIMIT" != 'unlimited' ]; then
+        if [ "${MAX_FD_LIMIT}" != 'unlimited' ]; then
             if [ $? -eq 0 ]; then
-                if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ]; then
+                if [ "${MAX_FD}" = "maximum" -o "${MAX_FD}" = "max" ]; then
                     # use the system max
-                    MAX_FD="$MAX_FD_LIMIT"
+                    MAX_FD="${MAX_FD_LIMIT}"
                 fi
 
-                ulimit -n $MAX_FD > /dev/null
+                ulimit -n ${MAX_FD} > /dev/null
                 # echo "ulimit -n" `ulimit -n`
                 if [ $? -ne 0 ]; then
-                    warn "Could not set maximum file descriptor limit: $MAX_FD"
+                    warn "Could not set maximum file descriptor limit: ${MAX_FD}"
                 fi
             else
-                warn "Could not query system maximum file descriptor limit: $MAX_FD_LIMIT"
+                warn "Could not query system maximum file descriptor limit: ${MAX_FD_LIMIT}"
             fi
         fi
     fi
@@ -96,24 +96,24 @@ unlimitFD() {
 locateJava() {
     # Setup the Java Virtual Machine
     if $cygwin ; then
-        [ -n "$JAVA" ] && JAVA=$(cygpath --unix "$JAVA")
-        [ -n "$JAVA_HOME" ] && JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+        [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
+        [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
     fi
 
-    if [ "x$JAVA" = "x" ] && [ -r /etc/gentoo-release ] ; then
+    if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
         JAVA_HOME=$(java-config --jre-home)
     fi
-    if [ "x$JAVA" = "x" ]; then
-        if [ "x$JAVA_HOME" != "x" ]; then
-            if [ ! -d "$JAVA_HOME" ]; then
-                die "JAVA_HOME is not valid: $JAVA_HOME"
+    if [ "x${JAVA}" = "x" ]; then
+        if [ "x${JAVA_HOME}" != "x" ]; then
+            if [ ! -d "${JAVA_HOME}" ]; then
+                die "JAVA_HOME is not valid: ${JAVA_HOME}"
             fi
-            JAVA="$JAVA_HOME/bin/java"
+            JAVA="${JAVA_HOME}/bin/java"
         else
             warn "JAVA_HOME not set; results may vary"
             JAVA=$(type java)
-            JAVA=$(expr "$JAVA" : '.* \(/.*\)$')
-            if [ "x$JAVA" = "x" ]; then
+            JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
+            if [ "x${JAVA}" = "x" ]; then
                 die "java command not found"
             fi
         fi
@@ -138,35 +138,35 @@ install() {
                 SVC_NAME=$2
         fi
 
-        SVC_FILE=/etc/init.d/$SVC_NAME
-        cp "$0" "$SVC_FILE"
-        sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": "$SVC_FILE"
-        sed -i s:PROGNAME=.*:PROGNAME="${SCRIPT_NAME}": "$SVC_FILE"
+        SVC_FILE="/etc/init.d/${SVC_NAME}"
+        cp "$0" "${SVC_FILE}"
+        sed -i s:NIFI_HOME=.*:NIFI_HOME="${NIFI_HOME}": "${SVC_FILE}"
+        sed -i s:PROGNAME=.*:PROGNAME="${SCRIPT_NAME}": "${SVC_FILE}"
         rm -f "/etc/rc2.d/S65${SVC_NAME}"
-        ln -s "/etc/init.d/$SVC_NAME" "/etc/rc2.d/S65${SVC_NAME}"
+        ln -s "/etc/init.d/${SVC_NAME}" "/etc/rc2.d/S65${SVC_NAME}"
         rm -f "/etc/rc2.d/K65${SVC_NAME}"
-        ln -s "/etc/init.d/$SVC_NAME" "/etc/rc2.d/K65${SVC_NAME}"
-        echo "Service $SVC_NAME installed"
+        ln -s "/etc/init.d/${SVC_NAME}" "/etc/rc2.d/K65${SVC_NAME}"
+        echo "Service ${SVC_NAME} installed"
 }
 
 
 run() {
-    BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf";
+    BOOTSTRAP_CONF="${NIFI_HOME}/conf/bootstrap.conf";
 
     run_as=$(grep run.as "${BOOTSTRAP_CONF}" | cut -d'=' -f2)
 
     sudo_cmd_prefix=""
     if $cygwin; then
-        if [ -n "$run_as" ]; then
+        if [ -n "${run_as}" ]; then
             echo "The run.as option is not supported in a Cygwin environment. Exiting."
             exit 1
         fi;
 
-        NIFI_HOME=$(cygpath --path --windows "$NIFI_HOME")
-        BOOTSTRAP_CONF=$(cygpath --path --windows "$BOOTSTRAP_CONF")
+        NIFI_HOME=$(cygpath --path --windows "${NIFI_HOME}")
+        BOOTSTRAP_CONF=$(cygpath --path --windows "${BOOTSTRAP_CONF}")
     else
-        if [ -n "$run_as" ]; then
-            if id -u "$run_as" >/dev/null 2>&1; then
+        if [ -n "${run_as}" ]; then
+            if id -u "${run_as}" >/dev/null 2>&1; then
                 sudo_cmd_prefix="sudo -u ${run_as}"
             else
                 echo "The specified run.as user ${run_as} does not exist. Exiting."
@@ -176,18 +176,18 @@ run() {
     fi
 
     echo
-    echo "Java home: $JAVA_HOME"
-    echo "NiFi home: $NIFI_HOME"
+    echo "Java home: ${JAVA_HOME}"
+    echo "NiFi home: ${NIFI_HOME}"
     echo
-    echo "Bootstrap Config File: $BOOTSTRAP_CONF"
+    echo "Bootstrap Config File: ${BOOTSTRAP_CONF}"
     echo
 
     # run 'start' in the background because the process will continue to run, monitoring NiFi.
     # all other commands will terminate quickly so want to just wait for them
     if [ "$1" = "start" ]; then
-        (cd "$NIFI_HOME" && ${sudo_cmd_prefix} "$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ &)
+        (cd "${NIFI_HOME}" && ${sudo_cmd_prefix} "${JAVA}" -cp "${NIFI_HOME}"/conf/:"${NIFI_HOME}"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="${BOOTSTRAP_CONF}" org.apache.nifi.bootstrap.RunNiFi $@ &)
     else
-        (cd "$NIFI_HOME" && ${sudo_cmd_prefix} "$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@)
+        (cd "${NIFI_HOME}" && ${sudo_cmd_prefix} "${JAVA}" -cp "${NIFI_HOME}"/conf/:"${NIFI_HOME}"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="${BOOTSTRAP_CONF}" org.apache.nifi.bootstrap.RunNiFi $@)
     fi
 
     # Wait just a bit (3 secs) to wait for the logging to finish and then echo a new-line.


[37/41] nifi git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi

Posted by ma...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5d90c9be
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5d90c9be
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5d90c9be

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 5d90c9be07b40455c831e5a602eeeb6660cdd8c6
Parents: bd506b1 a5a5bad
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 09:52:33 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:52:33 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java | 49 ++++++++++++++++++--
 .../processors/avro/TestConvertAvroToJSON.java  | 47 +++++++++++++++++--
 2 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[32/41] nifi git commit: Merge branch 'NIFI-988'

Posted by ma...@apache.org.
Merge branch 'NIFI-988'


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da28b81e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da28b81e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da28b81e

Branch: refs/heads/NIFI-810-InputRequirement
Commit: da28b81eece1277c7500f28708155239ae317e57
Parents: e68fdca 97441ea
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 21 10:44:06 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 21 10:44:06 2015 -0400

----------------------------------------------------------------------
 .../standard/PutDistributedMapCache.java        | 244 ++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestPutDistributedMapCache.java    | 280 +++++++++++++++++++
 3 files changed, 525 insertions(+)
----------------------------------------------------------------------



[33/41] nifi git commit: NIFI-945 Create a new property (JSON Container) in ConvertAvroToJson, which determines how stream of records is exposed: either as a sequence of single Objects ("none"), writing every Object to a new line, or as an array of Objec

Posted by ma...@apache.org.
NIFI-945 Create a new property (JSON Container) in ConvertAvroToJson, which determines how stream of records is exposed: either as a sequence of single Objects ("none"),
writing every Object to a new line, or as an array of Objects.

Let's assume you have an Avro content as stream of records (record1, record2, ...). If JSON container is "none", the converter will expose the records as sequence of
single JSON objects:

record1
record2
...
recordN

Please bear in mind, that the final output is not a valid JSON content. You can then forward this content e.g. to Kafka, where every record will be a single Kafka message.

If JSON container is "array", the output looks like this:

[record1,record2,...,recordN]

It is useful when you want to convert your Avro content to a valid JSON array.

This closes #88

Reviewed and Amended (amendments reviewed by original patch author on github) by Tony Kurc (tkurc@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a5a5badb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a5a5badb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a5a5badb

Branch: refs/heads/NIFI-810-InputRequirement
Commit: a5a5badb88311cb29cbe39088b57b9686314a1c6
Parents: da28b81
Author: Joe <jo...@impresstv.com>
Authored: Wed Oct 21 20:11:06 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Oct 21 20:15:29 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java | 49 ++++++++++++++++++--
 .../processors/avro/TestConvertAvroToJSON.java  | 47 +++++++++++++++++--
 2 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a5a5badb/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 8832a73..016750b 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -22,7 +22,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.avro.file.DataFileStream;
@@ -34,11 +37,13 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
@@ -49,9 +54,20 @@ import org.apache.nifi.processor.io.StreamCallback;
 @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
     + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
     + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
-    + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records.")
+    + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects")
 @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
 public class ConvertAvroToJSON extends AbstractProcessor {
+    protected static final String CONTAINER_ARRAY = "array";
+    protected static final String CONTAINER_NONE = "none";
+
+    static final PropertyDescriptor CONTAINER_OPTIONS
+            = new PropertyDescriptor.Builder()
+            .name("JSON container options")
+            .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
+            .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
+            .required(true)
+            .defaultValue(CONTAINER_ARRAY)
+            .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -62,6 +78,23 @@ public class ConvertAvroToJSON extends AbstractProcessor {
             .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
             .build();
 
+    
+
+    private List<PropertyDescriptor> properties;
+    
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        super.init(context);
+        
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CONTAINER_OPTIONS);
+        this.properties = Collections.unmodifiableList(properties);
+    
+    }
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> rels = new HashSet<>();
@@ -77,11 +110,14 @@ public class ConvertAvroToJSON extends AbstractProcessor {
             return;
         }
 
+        final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue();
+
         try {
             flowFile = session.write(flowFile, new StreamCallback() {
                 @Override
                 public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
                     try (final InputStream in = new BufferedInputStream(rawIn);
+
                          final OutputStream out = new BufferedOutputStream(rawOut);
                          final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
 
@@ -90,7 +126,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                         final String json = genericData.toString(record);
 
                         int recordCount = 0;
-                        if (reader.hasNext()) {
+                        if (reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)) {
                             out.write('[');
                         }
 
@@ -98,13 +134,18 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                         recordCount++;
 
                         while (reader.hasNext()) {
-                            out.write(',');
+                            if (containerOption.equals(CONTAINER_ARRAY)) {
+                                out.write(',');
+                            } else {
+                                out.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8));
+                            }
+
                             final GenericRecord nextRecord = reader.next(record);
                             out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8));
                             recordCount++;
                         }
 
-                        if (recordCount > 1) {
+                        if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) {
                             out.write(']');
                         }
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a5a5badb/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
index cfd26c1..302528e 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
@@ -16,7 +16,11 @@
  */
 package org.apache.nifi.processors.avro;
 
+import java.io.File;
+import java.io.IOException;
+
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
@@ -27,9 +31,6 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-
 public class TestConvertAvroToJSON {
 
     @Test
@@ -57,6 +58,8 @@ public class TestConvertAvroToJSON {
         final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
 
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY);
+
         final GenericRecord user1 = new GenericData.Record(schema);
         user1.put("name", "Alyssa");
         user1.put("favorite_number", 256);
@@ -85,4 +88,42 @@ public class TestConvertAvroToJSON {
         runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1);
     }
 
+    private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter<GenericRecord> datumWriter, final GenericRecord... users) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
+        dataFileWriter.create(schema, out);
+        for (final GenericRecord user : users) {
+            dataFileWriter.append(user);
+        }
+
+        dataFileWriter.close();
+        return out;
+    }
+
+    @Test
+    public void testMultipleAvroMessagesContainerNone() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("name", "George");
+        user2.put("favorite_number", 1024);
+        user2.put("favorite_color", "red");
+
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2);
+        runner.enqueue(out1.toByteArray());
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}");
+    }
 }


[34/41] nifi git commit: NIFI-972 attribute to indicate rows count and cleanup

Posted by ma...@apache.org.
NIFI-972 attribute to indicate rows count and cleanup

Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a9e53250
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a9e53250
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a9e53250

Branch: refs/heads/NIFI-810-InputRequirement
Commit: a9e5325047fbe3ad8a7d53b664e7944e39fcf658
Parents: ba3225f
Author: Toivo Adams <to...@gmail.com>
Authored: Fri Oct 23 12:44:27 2015 +0300
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:28:03 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/ExecuteSQL.java    |  9 ++++++++-
 .../nifi/processors/standard/util/JdbcCommon.java      | 13 -------------
 2 files changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a9e53250/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 45fd1a8..2a13f32 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -53,9 +53,13 @@ import org.apache.nifi.util.StopWatch;
     + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " +
         "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " +
         "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " +
-        "select query.")
+        "select query. " +
+        "FlowFile attribute 'executesql.row.count' indicates how many rows were selected."
+        )
 public class ExecuteSQL extends AbstractProcessor {
 
+    public static final String RESULT_ROW_COUNT = "executesql.row.count";
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -153,6 +157,9 @@ public class ExecuteSQL extends AbstractProcessor {
                 }
             });
 
+            // set attribute how many rows were selected
+            outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString());
+
             logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() });
             logger.info("Transferred {} to 'success'", new Object[] { outgoing });
             session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9e53250/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index de3d5d1..9cf9338 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -134,53 +134,41 @@ public class JdbcCommon {
                 case NCHAR:
                 case NVARCHAR:
                 case VARCHAR:
-//                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
-//                    builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
-
-
                     break;
 
                 case BOOLEAN:
-//                    builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
                    break;
 
                 case INTEGER:
                 case SMALLINT:
                 case TINYINT:
-//                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
                     break;
 
                 case BIGINT:
-//                    builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
                     break;
 
                 // java.sql.RowId is interface, is seems to be database
                 // implementation specific, let's convert to String
                 case ROWID:
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 case FLOAT:
                 case REAL:
-//                    builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
                     break;
 
                 case DOUBLE:
-//                    builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DECIMAL:
                 case NUMERIC:
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
@@ -188,7 +176,6 @@ public class JdbcCommon {
                 case DATE:
                 case TIME:
                 case TIMESTAMP:
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 


[24/41] nifi git commit: NIFI-747 This closes #104. PR from Venkatesh Sellappa was modified, then code reviewed by Joe Witt (comments in ticket)

Posted by ma...@apache.org.
NIFI-747 This closes #104. PR from Venkatesh Sellappa <VS...@outlook.com> was modified, then code reviewed by Joe Witt (comments in ticket)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/88b1b844
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/88b1b844
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/88b1b844

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 88b1b844fba5e4dded6242bb17f2096ff4172ed3
Parents: ad73a23
Author: Tony Kurc <tr...@gmail.com>
Authored: Sun Oct 18 20:04:24 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sun Oct 18 20:04:24 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenHTTP.java    | 28 ++++++++++++++++----
 .../servlets/ContentAcknowledgmentServlet.java  |  3 +--
 .../standard/servlets/ListenHTTPServlet.java    |  7 +++--
 3 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/88b1b844/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index c7842d9..a446eb6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -63,7 +63,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
 @Tags({"ingest", "http", "https", "rest", "listen"})
-@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
+@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
 public class ListenHTTP extends AbstractSessionFactoryProcessor {
 
     private Set<Relationship> relationships;
@@ -74,6 +74,14 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .description("Relationship for successfully received FlowFiles")
             .build();
 
+    public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
+            .name("Base Path")
+            .description("Base path for incoming connections")
+            .required(true)
+            .defaultValue("contentListener")
+            .addValidator(StandardValidators.URI_VALIDATOR)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+            .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
             .name("Listening Port")
             .description("The Port to listen on for incoming connections")
@@ -113,7 +121,6 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .required(false)
             .build();
 
-    public static final String URI = "/contentListener";
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
@@ -122,6 +129,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
+    public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
 
     private volatile Server server = null;
     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
@@ -134,6 +142,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         this.relationships = Collections.unmodifiableSet(relationships);
 
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(BASE_PATH);
         descriptors.add(PORT);
         descriptors.add(MAX_DATA_RATE);
         descriptors.add(SSL_CONTEXT_SERVICE);
@@ -170,6 +179,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     }
 
     private void createHttpServerFromService(final ProcessContext context) throws Exception {
+        final String basePath = context.getProperty(BASE_PATH).getValue();
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
@@ -230,12 +240,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
         for (final Class<? extends Servlet> cls : getServerClasses()) {
             final Path path = cls.getAnnotation(Path.class);
-            if (path == null) {
-                contextHandler.addServlet(cls, "/*");
-            } else {
+            // Note: servlets must have a path annotation - this will NPE otherwise
+            // also, servlets other than ListenHttpServlet must have a path starting with /
+            if(basePath.isEmpty() && !path.value().isEmpty()){
+                // Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
                 contextHandler.addServlet(cls, path.value());
             }
+            else{
+                contextHandler.addServlet(cls, "/" + basePath + path.value());
+            }
         }
+
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference);
@@ -243,6 +258,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
+        contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
 
         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
             contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
@@ -259,6 +275,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
 
     protected Set<Class<? extends Servlet>> getServerClasses() {
         final Set<Class<? extends Servlet>> s = new HashSet<>();
+        // NOTE: Servlets added below MUST have a Path annotation
+        // any servlets other than ListenHTTPServlet must have a Path annotation start with /
         s.add(ListenHTTPServlet.class);
         s.add(ContentAcknowledgmentServlet.class);
         return s;

http://git-wip-us.apache.org/repos/asf/nifi/blob/88b1b844/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
index 7dd6797..3252aea 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java
@@ -38,10 +38,9 @@ import org.apache.nifi.processors.standard.ListenHTTP;
 import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
 import org.apache.nifi.util.FormatUtils;
 
-@Path(ContentAcknowledgmentServlet.URI)
+@Path("/holds/*")
 public class ContentAcknowledgmentServlet extends HttpServlet {
 
-    public static final String URI = ListenHTTP.URI + "/holds/*";
     public static final String DEFAULT_FOUND_SUBJECT = "none";
     private static final long serialVersionUID = -2675148117984902978L;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/88b1b844/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 6a8f32f..79d3887 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -61,7 +61,8 @@ import org.apache.nifi.util.FlowFileUnpackagerV3;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 
-@Path(ListenHTTP.URI)
+
+@Path("")
 public class ListenHTTPServlet extends HttpServlet {
 
     private static final long serialVersionUID = 5329940480987723163L;
@@ -93,6 +94,7 @@ public class ListenHTTPServlet extends HttpServlet {
     private Pattern headerPattern;
     private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
     private StreamThrottler streamThrottler;
+    private String basePath;
 
     @SuppressWarnings("unchecked")
     @Override
@@ -105,6 +107,7 @@ public class ListenHTTPServlet extends HttpServlet {
         this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
         this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
         this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
+        this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
     }
 
     @Override
@@ -291,7 +294,7 @@ public class ListenHTTPServlet extends HttpServlet {
                 } while (previousWrapper != null);
 
                 response.setStatus(HttpServletResponse.SC_SEE_OTHER);
-                final String ackUri = ListenHTTP.URI + "/holds/" + uuid;
+                final String ackUri =  "/" + basePath + "/holds/" + uuid;
                 response.addHeader(LOCATION_HEADER_NAME, ackUri);
                 response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
                 response.getOutputStream().write(ackUri.getBytes("UTF-8"));


[19/41] nifi git commit: NIFI-612 Remove FlowUnmarshaller. Builds, runs, could not find any latent references using reflection or other ways of loading classes without direct references. This closes #103. Signed off by Tony Kurc

Posted by ma...@apache.org.
NIFI-612 Remove FlowUnmarshaller. Builds, runs, could not find any latent references using reflection or other ways of loading classes without direct references. This closes #103. Signed off by Tony Kurc <tk...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9a8d763d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9a8d763d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9a8d763d

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 9a8d763d8dddb8feb7e4b176cd97a7790436f60c
Parents: ce7d098
Author: Venkatesh Sellappa <VS...@outlook.com>
Authored: Sat Oct 17 09:43:10 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sat Oct 17 09:43:10 2015 -0400

----------------------------------------------------------------------
 .../nifi/controller/FlowUnmarshaller.java       | 77 --------------------
 1 file changed, 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9a8d763d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
deleted file mode 100644
index c8d90d7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-public class FlowUnmarshaller {
-
-    /**
-     * Interprets the given byte array as an XML document that conforms to the Flow Configuration schema and returns a FlowSnippetDTO representing the flow
-     *
-     * @param flowContents contents
-     * @param encryptor encryptor
-     * @return snippet dto
-     * @throws NullPointerException if <code>flowContents</code> is null
-     * @throws IOException ioe
-     * @throws SAXException sax
-     * @throws ParserConfigurationException pe
-     */
-    public static FlowSnippetDTO unmarshal(final byte[] flowContents, final StringEncryptor encryptor) throws IOException, SAXException, ParserConfigurationException {
-        if (Objects.requireNonNull(flowContents).length == 0) {
-            return new FlowSnippetDTO();
-        }
-
-        final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
-        dbf.setNamespaceAware(true);
-
-        final DocumentBuilder docBuilder = dbf.newDocumentBuilder();
-        final Document document = docBuilder.parse(new ByteArrayInputStream(flowContents));
-        final FlowSnippetDTO flowDto = new FlowSnippetDTO();
-
-        final NodeList nodeList = document.getElementsByTagName("rootGroup");
-        if (nodeList.getLength() == 0) {
-            return flowDto;
-        }
-        if (nodeList.getLength() > 1) {
-            throw new IllegalArgumentException("Contents contain multiple rootGroup elements");
-        }
-
-        final Set<ProcessGroupDTO> rootGroupSet = new HashSet<>();
-        flowDto.setProcessGroups(rootGroupSet);
-        rootGroupSet.add(FlowFromDOMFactory.getProcessGroup(null, (Element) nodeList.item(0), encryptor));
-
-        return flowDto;
-    }
-}


[10/41] nifi git commit: NIFI-988: PutDisributedMapCache processor implementation

Posted by ma...@apache.org.
NIFI-988: PutDisributedMapCache processor implementation


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b1328f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b1328f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b1328f3

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 6b1328f3f181a27a5856d26983ed3329ee317522
Parents: 14eaeeb
Author: Joe <jo...@impresstv.com>
Authored: Wed Sep 23 13:16:02 2015 +0200
Committer: Joe <jo...@impresstv.com>
Committed: Wed Sep 23 14:31:13 2015 +0200

----------------------------------------------------------------------
 .../standard/PutDistributedMapCache.java        | 244 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 2 files changed, 245 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6b1328f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
new file mode 100644
index 0000000..8e50c9f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+@EventDriven
+@SupportsBatching
+@Tags({"map", "cache", "put", "distributed"})
+@CapabilityDescription("Gets the content of a FlowFile and puts it to a distributed map cache, using a cache key " +
+        "computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
+        "'keep original' the entry is not replaced.'")
+@WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " +
+        "attribute is true, is the FlowFile is cached, otherwise false.")
+@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
+public class PutDistributedMapCache extends AbstractProcessor {
+
+    public static final String CACHED_ATTRIBUTE_NAME = "cached";
+
+    // Identifies the distributed map cache client
+    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
+            .name("Distributed Cache Service")
+            .description("The Controller Service that is used to cache flow files")
+            .required(true)
+            .identifiesControllerService(DistributedMapCacheClient.class)
+            .build();
+
+    // Selects the FlowFile attribute, whose value is used as cache key
+    public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
+            .name("Cache Entry Identifier")
+            .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
+                    "be evaluated against a FlowFile in order to determine the cache key")
+            .required(true)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue("replace", "Replace if present",
+            "Adds the specified entry to the cache, replacing any value that is currently set.");
+
+    public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
+            "Adds the specified entry to the cache, if the key does not exist.");
+
+    public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder()
+            .name("Cache update strategy")
+            .description("Determines how the cache is updated if the cache already contains the entry")
+            .required(true)
+            .allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
+            .defaultValue(CACHE_UPDATE_REPLACE.getValue())
+            .build();
+
+    public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder()
+            .name("Max cache entry size")
+            .description("The maximum amount of data to put into cache")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .expressionLanguageSupported(false)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
+            .build();
+    private final Set<Relationship> relationships;
+
+    private final Serializer<String> keySerializer = new StringSerializer();
+    private final Serializer<byte[]> valueSerializer = new CacheValueSerializer();
+    private final Deserializer<byte[]> valueDeserializer = new CacheValueDeserializer();
+
+    public PutDistributedMapCache() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(CACHE_ENTRY_IDENTIFIER);
+        descriptors.add(DISTRIBUTED_CACHE_SERVICE);
+        descriptors.add(CACHE_UPDATE_STRATEGY);
+        descriptors.add(CACHE_ENTRY_MAX_BYTES);
+        return descriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+
+        // cache key is computed from attribute 'CACHE_ENTRY_IDENTIFIER' with expression language support
+        final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+
+        // if the computed value is null, or empty, we transfer the flow file to failure relationship
+        if (StringUtils.isBlank(cacheKey)) {
+            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[] {flowFile});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        // the cache client used to interact with the distributed cache
+        final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+        try {
+
+            final long maxCacheEntrySize = context.getProperty(CACHE_ENTRY_MAX_BYTES).asDataSize(DataUnit.B).longValue();
+            long flowFileSize = flowFile.getSize();
+
+            // too big flow file
+            if (flowFileSize > maxCacheEntrySize) {
+                logger.warn("Flow file {} size {} exceeds the max cache entry size ({} B).", new Object[] {flowFile, flowFileSize, maxCacheEntrySize});
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+
+            if (flowFileSize == 0) {
+                logger.warn("Flow file {} is empty, there is nothing to cache.", new Object[] {flowFile});
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+
+            }
+
+            // get flow file content
+            final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+            session.exportTo(flowFile, byteStream);
+            byte[] cacheValue = byteStream.toByteArray();
+            final String updateStrategy = context.getProperty(CACHE_UPDATE_STRATEGY).getValue();
+            boolean cached = false;
+
+            if (updateStrategy.equals(CACHE_UPDATE_REPLACE.getValue())) {
+                cache.put(cacheKey, cacheValue, keySerializer, valueSerializer);
+                cached = true;
+            } else if (updateStrategy.equals(CACHE_UPDATE_KEEP_ORIGINAL.getValue())) {
+                final byte[] oldValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer);
+                if (oldValue == null) {
+                    cached = true;
+                }
+            }
+
+            // set 'cached' attribute
+            flowFile = session.putAttribute(flowFile, CACHED_ATTRIBUTE_NAME, String.valueOf(cached));
+
+            if (cached) {
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                session.transfer(flowFile, REL_FAILURE);
+            }
+
+        } catch (final IOException e) {
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
+        }
+    }
+
+    public static class CacheValueSerializer implements Serializer<byte[]> {
+
+        @Override
+        public void serialize(final byte[] bytes, final OutputStream out) throws SerializationException, IOException {
+            out.write(bytes);
+        }
+    }
+
+    public static class CacheValueDeserializer implements Deserializer<byte[]> {
+
+        @Override
+        public byte[] deserialize(final byte[] input) throws DeserializationException, IOException {
+            if (input == null || input.length == 0) {
+                return null;
+            }
+            return input;
+        }
+    }
+
+    /**
+     * Simple string serializer, used for serializing the cache key
+     */
+    public static class StringSerializer implements Serializer<String> {
+
+        @Override
+        public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
+            out.write(value.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b1328f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0ce1456..ff39ad3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -49,6 +49,7 @@ org.apache.nifi.processors.standard.ModifyBytes
 org.apache.nifi.processors.standard.MonitorActivity
 org.apache.nifi.processors.standard.PostHTTP
 org.apache.nifi.processors.standard.PutEmail
+org.apache.nifi.processors.standard.PutDistributedMapCache
 org.apache.nifi.processors.standard.PutFile
 org.apache.nifi.processors.standard.PutFTP
 org.apache.nifi.processors.standard.PutJMS


[36/41] nifi git commit: NIFI-972: Added additional unit test; deleted lines that were commented out

Posted by ma...@apache.org.
NIFI-972: Added additional unit test; deleted lines that were commented out


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bd506b1e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bd506b1e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bd506b1e

Branch: refs/heads/NIFI-810-InputRequirement
Commit: bd506b1e10ebc2ce025e836f83cb0f77562deba4
Parents: a9e5325
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 09:52:09 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:52:09 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/util/JdbcCommon.java    |  8 ++--
 .../standard/util/TestJdbcCommon.java           | 42 ++++++++++++++++++++
 .../standard/util/TestJdbcTypesDerby.java       |  4 --
 3 files changed, 47 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bd506b1e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 9cf9338..937dcab 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard.util;
 import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
 import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
 import static java.sql.Types.BLOB;
 import static java.sql.Types.BOOLEAN;
 import static java.sql.Types.CHAR;
@@ -83,7 +84,7 @@ public class JdbcCommon {
                     if (value == null) {
                         rec.put(i - 1, null);
 
-                    } else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) {
+                    } else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || javaSqlType == BLOB || javaSqlType == CLOB) {
                         // bytes requires little bit different handling
                         byte[] bytes = rs.getBytes(i);
                         ByteBuffer bb = ByteBuffer.wrap(bytes);
@@ -104,7 +105,7 @@ public class JdbcCommon {
                         // The different types that we support are numbers (int, long, double, float),
                         // as well as boolean values and Strings. Since Avro doesn't provide
                         // timestamp types, we want to convert those to Strings. So we will cast anything other
-                        // than numbers or booleans to strings by using to toString() method.
+                        // than numbers or booleans to strings by using the toString() method.
                         rec.put(i - 1, value.toString());
                     }
                 }
@@ -137,9 +138,10 @@ public class JdbcCommon {
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
+                case BIT:
                 case BOOLEAN:
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
-                   break;
+                    break;
 
                 case INTEGER:
                 case SMALLINT:

http://git-wip-us.apache.org/repos/asf/nifi/blob/bd506b1e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
index f54d4ba..9c9532f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
@@ -24,19 +24,26 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestJdbcCommon {
 
@@ -138,6 +145,41 @@ public class TestJdbcCommon {
         }
     }
 
+
+    @Test
+    public void testCreateSchemaTypes() throws SQLException, IllegalArgumentException, IllegalAccessException {
+        final Set<Integer> fieldsToIgnore = new HashSet<>();
+        fieldsToIgnore.add(Types.NULL);
+        fieldsToIgnore.add(Types.OTHER);
+
+        final Field[] fieldTypes = Types.class.getFields();
+        for (final Field field : fieldTypes) {
+            final Object fieldObject = field.get(null);
+            final int type = (int) fieldObject;
+
+            if (fieldsToIgnore.contains(Types.NULL)) {
+                continue;
+            }
+
+            final ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
+            Mockito.when(metadata.getColumnCount()).thenReturn(1);
+            Mockito.when(metadata.getColumnType(1)).thenReturn(type);
+            Mockito.when(metadata.getColumnName(1)).thenReturn(field.getName());
+            Mockito.when(metadata.getTableName(1)).thenReturn("table");
+
+            final ResultSet rs = Mockito.mock(ResultSet.class);
+            Mockito.when(rs.getMetaData()).thenReturn(metadata);
+
+            try {
+                JdbcCommon.createSchema(rs);
+            } catch (final IllegalArgumentException | SQLException sqle) {
+                sqle.printStackTrace();
+                Assert.fail("Failed when using type " + field.getName());
+            }
+        }
+    }
+
+
     // many test use Derby as database, so ensure driver is available
     @Test
     public void testDriverLoad() throws ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/bd506b1e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
index cf3d0c6..fc2bccd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
@@ -64,10 +64,6 @@ public class TestJdbcTypesDerby {
             + "  active tinyint NOT NULL DEFAULT 0, "
             + "  home_module_id int DEFAULT NULL, "
             + "   PRIMARY KEY (id) ) " ;
-//            + "   UNIQUE email ) " ;
-//            + "   KEY home_module_id (home_module_id) ) " ;
-//            + "   CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES "
-//            + "  modules (id) ON DELETE SET NULL " ;
 
     String dropTable = "drop table users";
 


[06/41] nifi git commit: Fix a type and modify an error message

Posted by ma...@apache.org.
Fix a type and modify an error message


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c492a1aa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c492a1aa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c492a1aa

Branch: refs/heads/NIFI-810-InputRequirement
Commit: c492a1aaae14efae3506ae43979fb93ba6655438
Parents: 0334f04
Author: Yuu ISHIKAWA <yu...@gmail.com>
Authored: Wed Sep 2 21:54:02 2015 +0900
Committer: Yuu ISHIKAWA <yu...@gmail.com>
Committed: Wed Sep 2 21:54:02 2015 +0900

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c492a1aa/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 803a6ab..dffcab8 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -94,7 +94,7 @@ public class DeleteS3Object extends AbstractS3Processor {
         try {
           s3.getObjectMetadata(bucket, key);
         } catch (final AmazonServiceException ase) {
-            getLogger().error("Not found sucha a file and folder on Amazon S3 {}", new Object[]{flowFile, ase});
+            getLogger().error("Not found such a S3 object for {}; routing to not found", new Object[]{flowFile, ase});
             session.transfer(flowFile, REL_NOT_FOUND);
             return;
         }


[07/41] nifi git commit: Remove the process to check if the target key exists or not

Posted by ma...@apache.org.
Remove the process to check if the target key exists or not


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d32a32a9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d32a32a9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d32a32a9

Branch: refs/heads/NIFI-810-InputRequirement
Commit: d32a32a92d29707d22899862594a2679d3fb2273
Parents: c492a1a
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Mon Sep 7 23:32:48 2015 +0900
Committer: Yu ISHIKAWA <yu...@gmail.com>
Committed: Mon Sep 7 23:32:48 2015 +0900

----------------------------------------------------------------------
 .../nifi/processors/aws/s3/DeleteS3Object.java       | 15 ---------------
 .../nifi/processors/aws/s3/TestDeleteS3Object.java   | 14 ++------------
 2 files changed, 2 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d32a32a9/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index dffcab8..c8950c3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -65,11 +65,6 @@ public class DeleteS3Object extends AbstractS3Processor {
             new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_NOT_FOUND)));
 
     @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
     }
@@ -89,16 +84,6 @@ public class DeleteS3Object extends AbstractS3Processor {
 
         final AmazonS3 s3 = getClient();
 
-        // Checks if the key exists or not
-        // If there is no such a key, then throws a exception
-        try {
-          s3.getObjectMetadata(bucket, key);
-        } catch (final AmazonServiceException ase) {
-            getLogger().error("Not found such a S3 object for {}; routing to not found", new Object[]{flowFile, ase});
-            session.transfer(flowFile, REL_NOT_FOUND);
-            return;
-        }
-
         // Deletes a key on Amazon S3
         try {
             if (versionId == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/d32a32a9/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
index 04d9e61..082a80d 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -22,12 +22,9 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
 
 import com.amazonaws.auth.PropertiesCredentials;
 import com.amazonaws.services.s3.AmazonS3Client;
@@ -36,7 +33,7 @@ import com.amazonaws.services.s3.model.CreateBucketRequest;
 import com.amazonaws.services.s3.model.DeleteBucketRequest;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
-import org.apache.nifi.processor.Relationship;
+
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
@@ -128,14 +125,7 @@ public class TestDeleteS3Object {
         runner.enqueue(new byte[0], attrs);
         runner.run(1);
 
-        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_NOT_FOUND, 1);
-    }
-
-    @Test
-    public void testGetRelationships() {
-        DeleteS3Object deleter = new DeleteS3Object();
-        Set<Relationship> relationships = deleter.getRelationships();
-        assertEquals(relationships.size(), 3);
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
     }
 
     // Uploads a test file


[22/41] nifi git commit: NIFI-922: Upgrade to ActiveMQ 5.12.0. closes #81.

Posted by ma...@apache.org.
NIFI-922: Upgrade to ActiveMQ 5.12.0. closes #81.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bd47f36c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bd47f36c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bd47f36c

Branch: refs/heads/NIFI-810-InputRequirement
Commit: bd47f36c06db91d497a4b33117c386666f17729c
Parents: 943ccfe
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Thu Sep 3 15:53:47 2015 +0200
Committer: joewitt <jo...@apache.org>
Committed: Sun Oct 18 19:13:26 2015 -0400

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bd47f36c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1d5a857..74dc4a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -512,7 +512,7 @@
             <dependency>
                 <groupId>org.apache.activemq</groupId>
                 <artifactId>activemq-client</artifactId>
-                <version>5.11.1</version>
+                <version>5.12.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.lucene</groupId>


[29/41] nifi git commit: NIFI-1042 Adds restlistener.remote.source.ip attribute to ListenHTTPServlet. This closes #105

Posted by ma...@apache.org.
NIFI-1042 Adds restlistener.remote.source.ip attribute to ListenHTTPServlet. This closes #105


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/518670db
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/518670db
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/518670db

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 518670dbf66a20855357fc379f516f488921d0c5
Parents: b809031
Author: Andre F de Miranda <tr...@users.noreply.github.com>
Authored: Mon Oct 19 23:43:31 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 19 23:43:44 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/servlets/ListenHTTPServlet.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/518670db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 79d3887..d740f93 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -269,6 +269,7 @@ public class ListenHTTPServlet extends HttpServlet {
 
                 flowFile = session.putAllAttributes(flowFile, attributes);
                 session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis);
+                flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
                 flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
                 flowFileSet.add(flowFile);
 


[39/41] nifi git commit: NIFI-1010: If database driver does not support getting table name for column from ResultSetMetadata, then just use a default name

Posted by ma...@apache.org.
NIFI-1010: If database driver does not support getting table name for column from ResultSetMetadata, then just use a default name


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/88fc8d28
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/88fc8d28
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/88fc8d28

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 88fc8d28a0502fa7d66662d6646f31cf33e131be
Parents: 22924c6
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 18 22:57:04 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 10:25:59 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/processors/standard/util/JdbcCommon.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/88fc8d28/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 6fc69ff..753513b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -51,6 +51,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
+import org.apache.commons.lang3.StringUtils;
 
 /**
  * JDBC / SQL common functions.
@@ -95,7 +96,10 @@ public class JdbcCommon {
     public static Schema createSchema(final ResultSet rs) throws SQLException {
         final ResultSetMetaData meta = rs.getMetaData();
         final int nrOfColumns = meta.getColumnCount();
-        final String tableName = meta.getTableName(1);
+        String tableName = meta.getTableName(1);
+        if (StringUtils.isBlank(tableName)) {
+            tableName = "NiFi_ExecuteSQL_Record";
+        }
 
         final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
 


[25/41] nifi git commit: NIFI-1016 Naive fix so org.apache.nifi.prioritizer.PriorityAttributePrioritizerTest builds

Posted by ma...@apache.org.
NIFI-1016 Naive fix so org.apache.nifi.prioritizer.PriorityAttributePrioritizerTest builds


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/92005422
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/92005422
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/92005422

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 9200542298c33bd7527a5329205527445613b2b4
Parents: 88b1b84
Author: Tony Kurc <tr...@gmail.com>
Authored: Sun Oct 18 20:34:00 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sun Oct 18 20:34:36 2015 -0400

----------------------------------------------------------------------
 .../PriorityAttributePrioritizerTest.java          | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/92005422/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
index d7d278c..7098551 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/test/java/org/apache/nifi/prioritizer/PriorityAttributePrioritizerTest.java
@@ -16,14 +16,13 @@
  */
 package org.apache.nifi.prioritizer;
 
-import org.apache.nifi.prioritizer.PriorityAttributePrioritizer;
-
 import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -48,13 +47,13 @@ public class PriorityAttributePrioritizerTest {
 
     @BeforeClass
     public static void init() {
-        attrsPri1.put(PriorityAttributePrioritizer.PRIORITY_ATTR, "1");
-        attrsPri2.put(PriorityAttributePrioritizer.PRIORITY_ATTR, "2");
-        attrsPrin1.put(PriorityAttributePrioritizer.PRIORITY_ATTR, "-1");
-        attrsPriA.put(PriorityAttributePrioritizer.PRIORITY_ATTR, "A");
-        attrsPriB.put(PriorityAttributePrioritizer.PRIORITY_ATTR, "B");
-        attrsPriLP.put(PriorityAttributePrioritizer.PRIORITY_ATTR, "5432123456789");
-        attrsPriLN.put(PriorityAttributePrioritizer.PRIORITY_ATTR, "-5432123456789");
+        attrsPri1.put(CoreAttributes.PRIORITY.key(), "1");
+        attrsPri2.put(CoreAttributes.PRIORITY.key(), "2");
+        attrsPrin1.put(CoreAttributes.PRIORITY.key(), "-1");
+        attrsPriA.put(CoreAttributes.PRIORITY.key(), "A");
+        attrsPriB.put(CoreAttributes.PRIORITY.key(), "B");
+        attrsPriLP.put(CoreAttributes.PRIORITY.key(), "5432123456789");
+        attrsPriLN.put(CoreAttributes.PRIORITY.key(), "-5432123456789");
     }
 
     @Test


[14/41] nifi git commit: NIFI-1031 resolve reporting task error in flow.xml

Posted by ma...@apache.org.
NIFI-1031 resolve reporting task error in flow.xml

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f7981272
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f7981272
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f7981272

Branch: refs/heads/NIFI-810-InputRequirement
Commit: f798127290c5a0bf7fba9ea0aae8cbb733e67f40
Parents: b4bfcc1
Author: Mike Moser <mo...@gmail.com>
Authored: Thu Oct 8 20:50:28 2015 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Oct 9 10:53:23 2015 -0400

----------------------------------------------------------------------
 .../nifi-framework-core/src/main/resources/FlowConfiguration.xsd   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f7981272/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index 1809554..56f08a6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -364,7 +364,7 @@
             <xs:element name="comment" type="xs:string" />
             <xs:element name="class" type="NonEmptyStringType" />
             <xs:element name="schedulingPeriod" type="NonEmptyStringType"/>
-            <xs:element name="schedulingState" type="ScheduledState" />
+            <xs:element name="scheduledState" type="ScheduledState" />
             <xs:element name="schedulingStrategy" type="SchedulingStrategy" />
     		
             <xs:element name="property" type="PropertyType" minOccurs="0" maxOccurs="unbounded"/>


[35/41] nifi git commit: NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1

Posted by ma...@apache.org.
NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1

Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ba3225fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ba3225fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ba3225fe

Branch: refs/heads/NIFI-810-InputRequirement
Commit: ba3225fe92258a6aca3cb706412ab62955914dc8
Parents: da28b81
Author: Toivo Adams <to...@gmail.com>
Authored: Thu Oct 1 17:22:08 2015 +0300
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:28:03 2015 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   9 ++
 .../processors/standard/util/JdbcCommon.java    |  77 ++++++++--
 .../standard/util/TestJdbcTypesDerby.java       | 137 +++++++++++++++++
 .../standard/util/TestJdbcTypesH2.java          | 149 +++++++++++++++++++
 4 files changed, 357 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2d94981..b0b3afa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -190,6 +190,15 @@ language governing permissions and limitations under the License. -->
             <artifactId>derby</artifactId>
             <scope>test</scope>
         </dependency>
+        
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.4.187</version>
+            <scope>test</scope>
+        </dependency>
+              
+        
     </dependencies>
     
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 6fc69ff..de3d5d1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -16,15 +16,20 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BLOB;
 import static java.sql.Types.BOOLEAN;
 import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
 import static java.sql.Types.DATE;
 import static java.sql.Types.DECIMAL;
 import static java.sql.Types.DOUBLE;
 import static java.sql.Types.FLOAT;
 import static java.sql.Types.INTEGER;
 import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARBINARY;
 import static java.sql.Types.LONGVARCHAR;
 import static java.sql.Types.NCHAR;
 import static java.sql.Types.NUMERIC;
@@ -35,10 +40,12 @@ import static java.sql.Types.SMALLINT;
 import static java.sql.Types.TIME;
 import static java.sql.Types.TIMESTAMP;
 import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
 import static java.sql.Types.VARCHAR;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -70,17 +77,34 @@ public class JdbcCommon {
             long nrOfRows = 0;
             while (rs.next()) {
                 for (int i = 1; i <= nrOfColumns; i++) {
+                    final int javaSqlType = meta.getColumnType(i);
                     final Object value = rs.getObject(i);
 
-                    // The different types that we support are numbers (int, long, double, float),
-                    // as well as boolean values and Strings. Since Avro doesn't provide
-                    // timestamp types, we want to convert those to Strings. So we will cast anything other
-                    // than numbers or booleans to strings by using to toString() method.
                     if (value == null) {
                         rec.put(i - 1, null);
+
+                    } else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) {
+                        // bytes requires little bit different handling
+                        byte[] bytes = rs.getBytes(i);
+                        ByteBuffer bb = ByteBuffer.wrap(bytes);
+                        rec.put(i - 1, bb);
+
+                    } else if (value instanceof Byte) {
+                        // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
+                        // But value is returned by JDBC as java.lang.Byte
+                        // (at least H2 JDBC works this way)
+                        // direct put to avro record results:
+                        // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
+                        rec.put(i - 1, ((Byte) value).intValue());
+
                     } else if (value instanceof Number || value instanceof Boolean) {
                         rec.put(i - 1, value);
+
                     } else {
+                        // The different types that we support are numbers (int, long, double, float),
+                        // as well as boolean values and Strings. Since Avro doesn't provide
+                        // timestamp types, we want to convert those to Strings. So we will cast anything other
+                        // than numbers or booleans to strings by using to toString() method.
                         rec.put(i - 1, value.toString());
                     }
                 }
@@ -110,53 +134,76 @@ public class JdbcCommon {
                 case NCHAR:
                 case NVARCHAR:
                 case VARCHAR:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+
+
                     break;
 
                 case BOOLEAN:
-                    builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
-                    break;
+//                    builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
+                   break;
 
                 case INTEGER:
                 case SMALLINT:
                 case TINYINT:
-                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
                     break;
 
                 case BIGINT:
-                    builder.name(meta.getColumnName(i)).type().longType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
                     break;
 
                 // java.sql.RowId is interface, is seems to be database
                 // implementation specific, let's convert to String
                 case ROWID:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 case FLOAT:
                 case REAL:
-                    builder.name(meta.getColumnName(i)).type().floatType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
                     break;
 
                 case DOUBLE:
-                    builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DECIMAL:
                 case NUMERIC:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DATE:
                 case TIME:
                 case TIMESTAMP:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
-                default:
+                case BINARY:
+                case VARBINARY:
+                case LONGVARBINARY:
+                case ARRAY:
+                case BLOB:
+                case CLOB:
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
                     break;
+
+
+                default:
+                    throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
new file mode 100644
index 0000000..cf3d0c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *  Useless test, Derby is so much different from MySQL
+ * so it is impossible reproduce problems with MySQL.
+ *
+ *
+ */
+@Ignore
+public class TestJdbcTypesDerby {
+
+    final static String DB_LOCATION = "target/db";
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    String createTable = "create table users ("
+            + "  id int NOT NULL GENERATED ALWAYS AS IDENTITY, "
+            + "  email varchar(255) NOT NULL UNIQUE, "
+            + "  password varchar(255) DEFAULT NULL, "
+            + "  activation_code varchar(255) DEFAULT NULL, "
+            + "  forgotten_password_code varchar(255) DEFAULT NULL, "
+            + "  forgotten_password_time datetime DEFAULT NULL, "
+            + "  created datetime NOT NULL, "
+            + "  active tinyint NOT NULL DEFAULT 0, "
+            + "  home_module_id int DEFAULT NULL, "
+            + "   PRIMARY KEY (id) ) " ;
+//            + "   UNIQUE email ) " ;
+//            + "   KEY home_module_id (home_module_id) ) " ;
+//            + "   CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES "
+//            + "  modules (id) ON DELETE SET NULL " ;
+
+    String dropTable = "drop table users";
+
+    @Test
+    public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+       // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        final Connection con = createConnection();
+        final Statement st = con.createStatement();
+
+        try {
+            st.executeUpdate(dropTable);
+        } catch (final Exception e) {
+            // table may not exist, this is not serious problem.
+        }
+
+        st.executeUpdate(createTable);
+
+        st.executeUpdate("insert into users (email, password, activation_code, created, active) "
+                           + " values ('robert.gates@cold.com', '******', 'CAS', '2005-12-09', 'Y')");
+
+        final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+        final byte[] serializedBytes = outStream.toByteArray();
+        assertNotNull(serializedBytes);
+        System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+        st.close();
+        con.close();
+
+        // Deserialize bytes to records
+
+        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us from
+                // allocating and garbage collecting many objects for files with
+                // many items.
+                record = dataFileReader.next(record);
+                System.out.println(record);
+            }
+        }
+    }
+
+    // many test use Derby as database, so ensure driver is available
+    @Test
+    public void testDriverLoad() throws ClassNotFoundException {
+        final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        assertNotNull(clazz);
+    }
+
+    private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+        return con;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
new file mode 100644
index 0000000..e3041b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestJdbcTypesH2 {
+
+    final static String DB_LOCATION = "~/var/test/h2";
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    String createTable = "    CREATE TABLE `users` ( "
+            + "  `id` int(11) NOT NULL AUTO_INCREMENT, "
+            + "  `email` varchar(255) NOT NULL, "
+            + "  `password` varchar(255) DEFAULT NULL, "
+            + "  `activation_code` varchar(255) DEFAULT NULL, "
+            + "  `forgotten_password_code` varchar(255) DEFAULT NULL, "
+            + "  `forgotten_password_time` datetime DEFAULT NULL, "
+            + "  `created` datetime NOT NULL, "
+            + "  `active` tinyint(1) NOT NULL DEFAULT '0', "
+            + "  `home_module_id` int(11) DEFAULT NULL, "
+
+            + "  somebinary BINARY default null, "
+            + "  somebinary2 VARBINARY default null, "
+            + "  somebinary3 LONGVARBINARY default null, "
+            + "  somearray   ARRAY default null, "
+            + "  someblob BLOB default null, "
+            + "  someclob CLOB default null, "
+
+            + "  PRIMARY KEY (`id`), "
+            + "  UNIQUE KEY `email` (`email`) ) " ;
+//            + "  KEY `home_module_id` (`home_module_id`) )" ;
+/*            + "  CONSTRAINT `users_ibfk_1` FOREIGN KEY (`home_module_id`) REFERENCES "
+            + "`modules` (`id`) ON DELETE SET NULL "
+            + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 " ;
+  */
+
+    String dropTable = "drop table users";
+
+    @Test
+    public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+       // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        final Connection con = createConnection();
+        final Statement st = con.createStatement();
+
+        try {
+            st.executeUpdate(dropTable);
+        } catch (final Exception e) {
+            // table may not exist, this is not serious problem.
+        }
+
+        st.executeUpdate(createTable);
+
+//        st.executeUpdate("insert into users (email, password, activation_code, forgotten_password_code, forgotten_password_time, created, active, home_module_id) "
+//                + " values ('robert.gates@cold.com', '******', 'CAS', 'ounou', '2005-12-09', '2005-12-03', 1, 5)");
+
+        st.executeUpdate("insert into users (email, password, activation_code, created, active, somebinary, somebinary2, somebinary3, someblob, someclob) "
+                + " values ('mari.gates@cold.com', '******', 'CAS', '2005-12-03', 3, '66FF', 'ABDF', 'EE64', 'BB22', 'CC88')");
+
+        final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+//      final ResultSet resultSet = st.executeQuery("select U.active from users U");
+//        final ResultSet resultSet = st.executeQuery("select U.somebinary from users U");
+
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+        final byte[] serializedBytes = outStream.toByteArray();
+        assertNotNull(serializedBytes);
+        System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+        st.close();
+        con.close();
+
+        // Deserialize bytes to records
+
+        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us from
+                // allocating and garbage collecting many objects for files with
+                // many items.
+                record = dataFileReader.next(record);
+                System.out.println(record);
+            }
+        }
+    }
+
+    // verify H2 driver loading and get Connections works
+    @Test
+    public void testDriverLoad() throws ClassNotFoundException, SQLException {
+//        final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+        Connection con = createConnection();
+
+        assertNotNull(con);
+        con.close();
+    }
+
+    private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+//        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        String connectionString = "jdbc:h2:file:" + DB_LOCATION + "/testdb7";
+        final Connection con = DriverManager.getConnection(connectionString, "SA", "");
+        return con;
+    }
+
+}


[30/41] nifi git commit: NIFI-944 Added support and unit tests for escaped characters in ConvertCSVtoAvro processor properties. This closes #87. Reviewed by Tony Kurc

Posted by ma...@apache.org.
NIFI-944 Added support and unit tests for escaped characters in ConvertCSVtoAvro processor properties. This closes #87. Reviewed by Tony Kurc <tk...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e68fdca5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e68fdca5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e68fdca5

Branch: refs/heads/NIFI-810-InputRequirement
Commit: e68fdca517eac53700c0f38bbaa7a893cfc28d9c
Parents: 518670d
Author: Joe <jo...@impresstv.com>
Authored: Tue Oct 20 23:48:54 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Oct 21 00:01:38 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/kite/ConvertCSVToAvro.java  | 17 ++++++++-
 .../processors/kite/TestCSVToAvroProcessor.java | 40 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e68fdca5/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 6c20a8f..ea84daa 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -30,6 +30,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -53,6 +54,7 @@ import org.kitesdk.data.spi.DefaultConfiguration;
 import org.kitesdk.data.spi.filesystem.CSVFileReader;
 import org.kitesdk.data.spi.filesystem.CSVProperties;
 
+
 import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
 
 @Tags({"kite", "csv", "avro"})
@@ -66,11 +68,15 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
         @Override
         public ValidationResult validate(String subject, String input,
                 ValidationContext context) {
+            // Allows special, escaped characters as input, which is then unescaped and converted to a single character.
+            // Examples for special characters: \t (or \u0009), \f.
+            input = unescapeString(input);
+
             return new ValidationResult.Builder()
                     .subject(subject)
                     .input(input)
-                    .explanation("Only single characters are supported")
-                    .valid(input.length() == 1)
+                    .explanation("Only non-null single characters are supported")
+                    .valid(input.length() == 1 && input.charAt(0) != 0)
                     .build();
         }
     };
@@ -295,4 +301,11 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
             session.transfer(incomingCSV, FAILURE);
         }
     }
+
+    private static String unescapeString(String input) {
+        if (input.length() > 1) {
+            input = StringEscapeUtils.unescapeJava(input);
+        }
+        return input;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68fdca5/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 43dea6e..0cde23c 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -48,9 +48,49 @@ public class TestCSVToAvroProcessor {
     public static final String FAILURE_CONTENT = ""
             + ",blue,\n"; // invalid, ID is missing
 
+    public static final String TSV_CONTENT = ""
+            + "1\tgreen\n"
+            + "\tblue\t\n" + // invalid, ID is missing
+            "2\tgrey\t12.95";
+
     public static final String FAILURE_SUMMARY = "" +
             "Field id: cannot make \"long\" value: '': Field id type:LONG pos:0 not set and has no default value";
 
+    /**
+     * Basic test for tab separated files, similar to #test
+     * @throws IOException
+     */
+    @Test
+    public void testTabSeparatedConversion() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.setProperty(ConvertCSVToAvro.DELIMITER, "\\t");
+        runner.assertValid();
+
+        runner.enqueue(streamFor(TSV_CONTENT));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 row", 1, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 1);
+
+        MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
+        String failureContent = new String(runner.getContentAsByteArray(incompatible),
+                StandardCharsets.UTF_8);
+
+        Assert.assertEquals("Should reject an invalid string and double",
+                TSV_CONTENT, failureContent);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+    }
+
+
     @Test
     public void testBasicConversion() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);