You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/23 16:13:57 UTC

[1/5] nifi git commit: NIFI-972 attribute to indicate rows count and cleanup

Repository: nifi
Updated Branches:
  refs/heads/master a5a5badb8 -> 0fc5d3046


NIFI-972 attribute to indicate rows count and cleanup

Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a9e53250
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a9e53250
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a9e53250

Branch: refs/heads/master
Commit: a9e5325047fbe3ad8a7d53b664e7944e39fcf658
Parents: ba3225f
Author: Toivo Adams <to...@gmail.com>
Authored: Fri Oct 23 12:44:27 2015 +0300
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:28:03 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/ExecuteSQL.java    |  9 ++++++++-
 .../nifi/processors/standard/util/JdbcCommon.java      | 13 -------------
 2 files changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a9e53250/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 45fd1a8..2a13f32 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -53,9 +53,13 @@ import org.apache.nifi.util.StopWatch;
     + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " +
         "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " +
         "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " +
-        "select query.")
+        "select query. " +
+        "FlowFile attribute 'executesql.row.count' indicates how many rows were selected."
+        )
 public class ExecuteSQL extends AbstractProcessor {
 
+    public static final String RESULT_ROW_COUNT = "executesql.row.count";
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -153,6 +157,9 @@ public class ExecuteSQL extends AbstractProcessor {
                 }
             });
 
+            // set attribute how many rows were selected
+            outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString());
+
             logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() });
             logger.info("Transferred {} to 'success'", new Object[] { outgoing });
             session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));

http://git-wip-us.apache.org/repos/asf/nifi/blob/a9e53250/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index de3d5d1..9cf9338 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -134,53 +134,41 @@ public class JdbcCommon {
                 case NCHAR:
                 case NVARCHAR:
                 case VARCHAR:
-//                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
-//                    builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
-
-
                     break;
 
                 case BOOLEAN:
-//                    builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
                    break;
 
                 case INTEGER:
                 case SMALLINT:
                 case TINYINT:
-//                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
                     break;
 
                 case BIGINT:
-//                    builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
                     break;
 
                 // java.sql.RowId is interface, is seems to be database
                 // implementation specific, let's convert to String
                 case ROWID:
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 case FLOAT:
                 case REAL:
-//                    builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
                     break;
 
                 case DOUBLE:
-//                    builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DECIMAL:
                 case NUMERIC:
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
@@ -188,7 +176,6 @@ public class JdbcCommon {
                 case DATE:
                 case TIME:
                 case TIMESTAMP:
-//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 


[5/5] nifi git commit: NIFI-1055: Fixed checkstyle violations

Posted by ma...@apache.org.
NIFI-1055: Fixed checkstyle violations


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0fc5d304
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0fc5d304
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0fc5d304

Branch: refs/heads/master
Commit: 0fc5d3046178836365f710d312cef6568126a99d
Parents: 5d90c9b
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 09:59:24 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 10:08:44 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java |  42 ++++----
 .../processors/kite/TestCSVToAvroProcessor.java |   1 -
 .../nifi/processors/standard/ListenHTTP.java    | 108 +++++++++----------
 .../standard/PutDistributedMapCache.java        |  96 +++++++++--------
 .../standard/TestPutDistributedMapCache.java    |  31 +++---
 5 files changed, 140 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 016750b..f0ba71a 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -50,7 +50,7 @@ import org.apache.nifi.processor.io.StreamCallback;
 
 @SideEffectFree
 @SupportsBatching
-@Tags({ "json", "avro", "binary" })
+@Tags({"json", "avro", "binary"})
 @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
     + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
     + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
@@ -60,41 +60,41 @@ public class ConvertAvroToJSON extends AbstractProcessor {
     protected static final String CONTAINER_ARRAY = "array";
     protected static final String CONTAINER_NONE = "none";
 
-    static final PropertyDescriptor CONTAINER_OPTIONS
-            = new PropertyDescriptor.Builder()
-            .name("JSON container options")
-            .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
-            .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
-            .required(true)
-            .defaultValue(CONTAINER_ARRAY)
-            .build();
+    static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
+        .name("JSON container options")
+        .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE
+            + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
+        .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
+        .required(true)
+        .defaultValue(CONTAINER_ARRAY)
+        .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("A FlowFile is routed to this relationship after it has been converted to JSON")
-            .build();
+        .name("success")
+        .description("A FlowFile is routed to this relationship after it has been converted to JSON")
+        .build();
     static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
-            .build();
+        .name("failure")
+        .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
+        .build();
 
-    
 
     private List<PropertyDescriptor> properties;
-    
+
     @Override
     protected void init(ProcessorInitializationContext context) {
         super.init(context);
-        
+
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(CONTAINER_OPTIONS);
         this.properties = Collections.unmodifiableList(properties);
-    
     }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
     }
+
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> rels = new HashSet<>();
@@ -118,8 +118,8 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                 public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
                     try (final InputStream in = new BufferedInputStream(rawIn);
 
-                         final OutputStream out = new BufferedOutputStream(rawOut);
-                         final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+                        final OutputStream out = new BufferedOutputStream(rawOut);
+                        final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
 
                         final GenericData genericData = GenericData.get();
                         GenericRecord record = reader.next();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 0cde23c..902ec79 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -58,7 +58,6 @@ public class TestCSVToAvroProcessor {
 
     /**
      * Basic test for tab separated files, similar to #test
-     * @throws IOException
      */
     @Test
     public void testTabSeparatedConversion() throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index a446eb6..9ad1703 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -31,10 +31,12 @@ import java.util.regex.Pattern;
 import javax.servlet.Servlet;
 import javax.ws.rs.Path;
 
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import org.apache.nifi.stream.io.StreamThrottler;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
@@ -42,15 +44,12 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
 import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
 import org.apache.nifi.ssl.SSLContextService;
-
+import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+import org.apache.nifi.stream.io.StreamThrottler;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -70,56 +69,56 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     private List<PropertyDescriptor> properties;
 
     public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Relationship for successfully received FlowFiles")
-            .build();
+        .name("success")
+        .description("Relationship for successfully received FlowFiles")
+        .build();
 
     public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
-            .name("Base Path")
-            .description("Base path for incoming connections")
-            .required(true)
-            .defaultValue("contentListener")
-            .addValidator(StandardValidators.URI_VALIDATOR)
-            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
-            .build();
+        .name("Base Path")
+        .description("Base path for incoming connections")
+        .required(true)
+        .defaultValue("contentListener")
+        .addValidator(StandardValidators.URI_VALIDATOR)
+        .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
+        .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
-            .name("Listening Port")
-            .description("The Port to listen on for incoming connections")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
+        .name("Listening Port")
+        .description("The Port to listen on for incoming connections")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
     public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
-            .name("Authorized DN Pattern")
-            .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
-            .required(true)
-            .defaultValue(".*")
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .build();
+        .name("Authorized DN Pattern")
+        .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
+        .required(true)
+        .defaultValue(".*")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
     public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
-            .name("Max Unconfirmed Flowfile Time")
-            .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
-            .required(true)
-            .defaultValue("60 secs")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
+        .name("Max Unconfirmed Flowfile Time")
+        .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
+        .required(true)
+        .defaultValue("60 secs")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
     public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
-            .name("Max Data to Receive per Second")
-            .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
+        .name("Max Data to Receive per Second")
+        .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
+        .required(false)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("The Controller Service to use in order to obtain an SSL Context")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
+        .name("SSL Context Service")
+        .description("The Controller Service to use in order to obtain an SSL Context")
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .build();
     public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
-            .name("HTTP Headers to receive as Attributes (Regex)")
-            .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .required(false)
-            .build();
+        .name("HTTP Headers to receive as Attributes (Regex)")
+        .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .required(false)
+        .build();
 
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
@@ -173,7 +172,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             toShutdown.stop();
             toShutdown.destroy();
         } catch (final Exception ex) {
-            getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex});
+            getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[] {ex});
             this.server = null;
         }
     }
@@ -235,18 +234,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         connector.setPort(port);
 
         // add the connector to the server
-        server.setConnectors(new Connector[]{connector});
+        server.setConnectors(new Connector[] {connector});
 
         final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
         for (final Class<? extends Servlet> cls : getServerClasses()) {
             final Path path = cls.getAnnotation(Path.class);
             // Note: servlets must have a path annotation - this will NPE otherwise
             // also, servlets other than ListenHttpServlet must have a path starting with /
-            if(basePath.isEmpty() && !path.value().isEmpty()){
+            if (basePath.isEmpty() && !path.value().isEmpty()) {
                 // Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
                 contextHandler.addServlet(cls, path.value());
-            }
-            else{
+            } else {
                 contextHandler.addServlet(cls, "/" + basePath + path.value());
             }
         }
@@ -304,7 +302,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         for (final String id : findOldFlowFileIds(context)) {
             final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
             if (wrapper != null) {
-                getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id});
+                getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[] {id});
                 wrapper.session.rollback();
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
index 8e50c9f..bc1fde5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDistributedMapCache.java
@@ -16,6 +16,16 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -33,24 +43,22 @@ import org.apache.nifi.distributed.cache.client.exception.SerializationException
 import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-
 @EventDriven
 @SupportsBatching
 @Tags({"map", "cache", "put", "distributed"})
 @CapabilityDescription("Gets the content of a FlowFile and puts it to a distributed map cache, using a cache key " +
-        "computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
-        "'keep original' the entry is not replaced.'")
+    "computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
+    "'keep original' the entry is not replaced.'")
 @WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " +
-        "attribute is true, is the FlowFile is cached, otherwise false.")
+    "attribute is true, is the FlowFile is cached, otherwise false.")
 @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
 public class PutDistributedMapCache extends AbstractProcessor {
 
@@ -58,55 +66,55 @@ public class PutDistributedMapCache extends AbstractProcessor {
 
     // Identifies the distributed map cache client
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
-            .name("Distributed Cache Service")
-            .description("The Controller Service that is used to cache flow files")
-            .required(true)
-            .identifiesControllerService(DistributedMapCacheClient.class)
-            .build();
+        .name("Distributed Cache Service")
+        .description("The Controller Service that is used to cache flow files")
+        .required(true)
+        .identifiesControllerService(DistributedMapCacheClient.class)
+        .build();
 
     // Selects the FlowFile attribute, whose value is used as cache key
     public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
-            .name("Cache Entry Identifier")
-            .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
-                    "be evaluated against a FlowFile in order to determine the cache key")
-            .required(true)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Cache Entry Identifier")
+        .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
+            "be evaluated against a FlowFile in order to determine the cache key")
+        .required(true)
+        .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
+        .expressionLanguageSupported(true)
+        .build();
 
     public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue("replace", "Replace if present",
-            "Adds the specified entry to the cache, replacing any value that is currently set.");
+        "Adds the specified entry to the cache, replacing any value that is currently set.");
 
     public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
-            "Adds the specified entry to the cache, if the key does not exist.");
+        "Adds the specified entry to the cache, if the key does not exist.");
 
     public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder()
-            .name("Cache update strategy")
-            .description("Determines how the cache is updated if the cache already contains the entry")
-            .required(true)
-            .allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
-            .defaultValue(CACHE_UPDATE_REPLACE.getValue())
-            .build();
+        .name("Cache update strategy")
+        .description("Determines how the cache is updated if the cache already contains the entry")
+        .required(true)
+        .allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
+        .defaultValue(CACHE_UPDATE_REPLACE.getValue())
+        .build();
 
     public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder()
-            .name("Max cache entry size")
-            .description("The maximum amount of data to put into cache")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .expressionLanguageSupported(false)
-            .build();
+        .name("Max cache entry size")
+        .description("The maximum amount of data to put into cache")
+        .required(false)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .expressionLanguageSupported(false)
+        .build();
 
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
-            .build();
+        .name("success")
+        .description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
+        .build();
 
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
-            .build();
+        .name("failure")
+        .description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
+        .build();
     private final Set<Relationship> relationships;
 
     private final Serializer<String> keySerializer = new StringSerializer();
@@ -207,7 +215,7 @@ public class PutDistributedMapCache extends AbstractProcessor {
         } catch (final IOException e) {
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
-            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
+            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0fc5d304/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
index 8347e7f..05d4293 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -26,22 +34,11 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.junit.Assert.assertEquals;
 
 public class TestPutDistributedMapCache {
 
     private TestRunner runner;
     private MockCacheClient service;
-    private PutDistributedMapCache processor;
 
     @Before
     public void setup() throws InitializationException {
@@ -57,7 +54,7 @@ public class TestPutDistributedMapCache {
     public void testNoCacheKey() throws InitializationException {
 
         runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
-        runner.enqueue(new byte[]{});
+        runner.enqueue(new byte[] {});
 
         runner.run();
 
@@ -99,7 +96,7 @@ public class TestPutDistributedMapCache {
         props.put("caheKeyAttribute", "2");
 
         // flow file without content
-        runner.enqueue(new byte[]{}, props);
+        runner.enqueue(new byte[] {}, props);
 
         runner.run();
 
@@ -171,7 +168,7 @@ public class TestPutDistributedMapCache {
 
         runner.clearTransferState();
 
-        //we expect that the cache entry is replaced
+        // we expect that the cache entry is replaced
         value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
         assertEquals(replaced, new String(value, "UTF-8"));
     }
@@ -215,7 +212,7 @@ public class TestPutDistributedMapCache {
 
         runner.clearTransferState();
 
-        //we expect that the cache entry is NOT replaced
+        // we expect that the cache entry is NOT replaced
         value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
         assertEquals(original, new String(value, "UTF-8"));
     }
@@ -225,7 +222,7 @@ public class TestPutDistributedMapCache {
         private boolean failOnCalls = false;
 
         private void verifyNotFail() throws IOException {
-            if ( failOnCalls ) {
+            if (failOnCalls) {
                 throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
             }
         }
@@ -240,7 +237,7 @@ public class TestPutDistributedMapCache {
         @Override
         @SuppressWarnings("unchecked")
         public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
-                                          final Deserializer<V> valueDeserializer) throws IOException {
+            final Deserializer<V> valueDeserializer) throws IOException {
             verifyNotFail();
             return (V) values.putIfAbsent(key, value);
         }


[4/5] nifi git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi

Posted by ma...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5d90c9be
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5d90c9be
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5d90c9be

Branch: refs/heads/master
Commit: 5d90c9be07b40455c831e5a602eeeb6660cdd8c6
Parents: bd506b1 a5a5bad
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 09:52:33 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:52:33 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java | 49 ++++++++++++++++++--
 .../processors/avro/TestConvertAvroToJSON.java  | 47 +++++++++++++++++--
 2 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[2/5] nifi git commit: NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1

Posted by ma...@apache.org.
NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1

Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ba3225fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ba3225fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ba3225fe

Branch: refs/heads/master
Commit: ba3225fe92258a6aca3cb706412ab62955914dc8
Parents: da28b81
Author: Toivo Adams <to...@gmail.com>
Authored: Thu Oct 1 17:22:08 2015 +0300
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:28:03 2015 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   9 ++
 .../processors/standard/util/JdbcCommon.java    |  77 ++++++++--
 .../standard/util/TestJdbcTypesDerby.java       | 137 +++++++++++++++++
 .../standard/util/TestJdbcTypesH2.java          | 149 +++++++++++++++++++
 4 files changed, 357 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2d94981..b0b3afa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -190,6 +190,15 @@ language governing permissions and limitations under the License. -->
             <artifactId>derby</artifactId>
             <scope>test</scope>
         </dependency>
+        
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.4.187</version>
+            <scope>test</scope>
+        </dependency>
+              
+        
     </dependencies>
     
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 6fc69ff..de3d5d1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -16,15 +16,20 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BLOB;
 import static java.sql.Types.BOOLEAN;
 import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
 import static java.sql.Types.DATE;
 import static java.sql.Types.DECIMAL;
 import static java.sql.Types.DOUBLE;
 import static java.sql.Types.FLOAT;
 import static java.sql.Types.INTEGER;
 import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARBINARY;
 import static java.sql.Types.LONGVARCHAR;
 import static java.sql.Types.NCHAR;
 import static java.sql.Types.NUMERIC;
@@ -35,10 +40,12 @@ import static java.sql.Types.SMALLINT;
 import static java.sql.Types.TIME;
 import static java.sql.Types.TIMESTAMP;
 import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
 import static java.sql.Types.VARCHAR;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -70,17 +77,34 @@ public class JdbcCommon {
             long nrOfRows = 0;
             while (rs.next()) {
                 for (int i = 1; i <= nrOfColumns; i++) {
+                    final int javaSqlType = meta.getColumnType(i);
                     final Object value = rs.getObject(i);
 
-                    // The different types that we support are numbers (int, long, double, float),
-                    // as well as boolean values and Strings. Since Avro doesn't provide
-                    // timestamp types, we want to convert those to Strings. So we will cast anything other
-                    // than numbers or booleans to strings by using to toString() method.
                     if (value == null) {
                         rec.put(i - 1, null);
+
+                    } else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) {
+                        // bytes requires little bit different handling
+                        byte[] bytes = rs.getBytes(i);
+                        ByteBuffer bb = ByteBuffer.wrap(bytes);
+                        rec.put(i - 1, bb);
+
+                    } else if (value instanceof Byte) {
+                        // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
+                        // But value is returned by JDBC as java.lang.Byte
+                        // (at least H2 JDBC works this way)
+                        // direct put to avro record results:
+                        // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
+                        rec.put(i - 1, ((Byte) value).intValue());
+
                     } else if (value instanceof Number || value instanceof Boolean) {
                         rec.put(i - 1, value);
+
                     } else {
+                        // The different types that we support are numbers (int, long, double, float),
+                        // as well as boolean values and Strings. Since Avro doesn't provide
+                        // timestamp types, we want to convert those to Strings. So we will cast anything other
+                        // than numbers or booleans to strings by using to toString() method.
                         rec.put(i - 1, value.toString());
                     }
                 }
@@ -110,53 +134,76 @@ public class JdbcCommon {
                 case NCHAR:
                 case NVARCHAR:
                 case VARCHAR:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+
+
                     break;
 
                 case BOOLEAN:
-                    builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
-                    break;
+//                    builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
+                   break;
 
                 case INTEGER:
                 case SMALLINT:
                 case TINYINT:
-                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
                     break;
 
                 case BIGINT:
-                    builder.name(meta.getColumnName(i)).type().longType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
                     break;
 
                 // java.sql.RowId is interface, is seems to be database
                 // implementation specific, let's convert to String
                 case ROWID:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 case FLOAT:
                 case REAL:
-                    builder.name(meta.getColumnName(i)).type().floatType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
                     break;
 
                 case DOUBLE:
-                    builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DECIMAL:
                 case NUMERIC:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DATE:
                 case TIME:
                 case TIMESTAMP:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
-                default:
+                case BINARY:
+                case VARBINARY:
+                case LONGVARBINARY:
+                case ARRAY:
+                case BLOB:
+                case CLOB:
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
                     break;
+
+
+                default:
+                    throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
new file mode 100644
index 0000000..cf3d0c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *  Useless test, Derby is so much different from MySQL
+ * so it is impossible reproduce problems with MySQL.
+ *
+ *
+ */
+@Ignore
+public class TestJdbcTypesDerby {
+
+    final static String DB_LOCATION = "target/db";
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    String createTable = "create table users ("
+            + "  id int NOT NULL GENERATED ALWAYS AS IDENTITY, "
+            + "  email varchar(255) NOT NULL UNIQUE, "
+            + "  password varchar(255) DEFAULT NULL, "
+            + "  activation_code varchar(255) DEFAULT NULL, "
+            + "  forgotten_password_code varchar(255) DEFAULT NULL, "
+            + "  forgotten_password_time datetime DEFAULT NULL, "
+            + "  created datetime NOT NULL, "
+            + "  active tinyint NOT NULL DEFAULT 0, "
+            + "  home_module_id int DEFAULT NULL, "
+            + "   PRIMARY KEY (id) ) " ;
+//            + "   UNIQUE email ) " ;
+//            + "   KEY home_module_id (home_module_id) ) " ;
+//            + "   CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES "
+//            + "  modules (id) ON DELETE SET NULL " ;
+
+    String dropTable = "drop table users";
+
+    @Test
+    public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+       // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        final Connection con = createConnection();
+        final Statement st = con.createStatement();
+
+        try {
+            st.executeUpdate(dropTable);
+        } catch (final Exception e) {
+            // table may not exist, this is not serious problem.
+        }
+
+        st.executeUpdate(createTable);
+
+        st.executeUpdate("insert into users (email, password, activation_code, created, active) "
+                           + " values ('robert.gates@cold.com', '******', 'CAS', '2005-12-09', 'Y')");
+
+        final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+        final byte[] serializedBytes = outStream.toByteArray();
+        assertNotNull(serializedBytes);
+        System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+        st.close();
+        con.close();
+
+        // Deserialize bytes to records
+
+        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us from
+                // allocating and garbage collecting many objects for files with
+                // many items.
+                record = dataFileReader.next(record);
+                System.out.println(record);
+            }
+        }
+    }
+
+    // many test use Derby as database, so ensure driver is available
+    @Test
+    public void testDriverLoad() throws ClassNotFoundException {
+        final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        assertNotNull(clazz);
+    }
+
+    private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+        return con;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
new file mode 100644
index 0000000..e3041b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestJdbcTypesH2 {
+
+    final static String DB_LOCATION = "~/var/test/h2";
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    String createTable = "    CREATE TABLE `users` ( "
+            + "  `id` int(11) NOT NULL AUTO_INCREMENT, "
+            + "  `email` varchar(255) NOT NULL, "
+            + "  `password` varchar(255) DEFAULT NULL, "
+            + "  `activation_code` varchar(255) DEFAULT NULL, "
+            + "  `forgotten_password_code` varchar(255) DEFAULT NULL, "
+            + "  `forgotten_password_time` datetime DEFAULT NULL, "
+            + "  `created` datetime NOT NULL, "
+            + "  `active` tinyint(1) NOT NULL DEFAULT '0', "
+            + "  `home_module_id` int(11) DEFAULT NULL, "
+
+            + "  somebinary BINARY default null, "
+            + "  somebinary2 VARBINARY default null, "
+            + "  somebinary3 LONGVARBINARY default null, "
+            + "  somearray   ARRAY default null, "
+            + "  someblob BLOB default null, "
+            + "  someclob CLOB default null, "
+
+            + "  PRIMARY KEY (`id`), "
+            + "  UNIQUE KEY `email` (`email`) ) " ;
+//            + "  KEY `home_module_id` (`home_module_id`) )" ;
+/*            + "  CONSTRAINT `users_ibfk_1` FOREIGN KEY (`home_module_id`) REFERENCES "
+            + "`modules` (`id`) ON DELETE SET NULL "
+            + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 " ;
+  */
+
+    String dropTable = "drop table users";
+
+    @Test
+    public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+       // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        final Connection con = createConnection();
+        final Statement st = con.createStatement();
+
+        try {
+            st.executeUpdate(dropTable);
+        } catch (final Exception e) {
+            // table may not exist, this is not serious problem.
+        }
+
+        st.executeUpdate(createTable);
+
+//        st.executeUpdate("insert into users (email, password, activation_code, forgotten_password_code, forgotten_password_time, created, active, home_module_id) "
+//                + " values ('robert.gates@cold.com', '******', 'CAS', 'ounou', '2005-12-09', '2005-12-03', 1, 5)");
+
+        st.executeUpdate("insert into users (email, password, activation_code, created, active, somebinary, somebinary2, somebinary3, someblob, someclob) "
+                + " values ('mari.gates@cold.com', '******', 'CAS', '2005-12-03', 3, '66FF', 'ABDF', 'EE64', 'BB22', 'CC88')");
+
+        final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+//      final ResultSet resultSet = st.executeQuery("select U.active from users U");
+//        final ResultSet resultSet = st.executeQuery("select U.somebinary from users U");
+
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+        final byte[] serializedBytes = outStream.toByteArray();
+        assertNotNull(serializedBytes);
+        System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+        st.close();
+        con.close();
+
+        // Deserialize bytes to records
+
+        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us from
+                // allocating and garbage collecting many objects for files with
+                // many items.
+                record = dataFileReader.next(record);
+                System.out.println(record);
+            }
+        }
+    }
+
+    // verify H2 driver loading and get Connections works
+    @Test
+    public void testDriverLoad() throws ClassNotFoundException, SQLException {
+//        final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+        Connection con = createConnection();
+
+        assertNotNull(con);
+        con.close();
+    }
+
+    private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+//        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        String connectionString = "jdbc:h2:file:" + DB_LOCATION + "/testdb7";
+        final Connection con = DriverManager.getConnection(connectionString, "SA", "");
+        return con;
+    }
+
+}


[3/5] nifi git commit: NIFI-972: Added additional unit test; deleted lines that were commented out

Posted by ma...@apache.org.
NIFI-972: Added additional unit test; deleted lines that were commented out


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bd506b1e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bd506b1e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bd506b1e

Branch: refs/heads/master
Commit: bd506b1e10ebc2ce025e836f83cb0f77562deba4
Parents: a9e5325
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 09:52:09 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:52:09 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/util/JdbcCommon.java    |  8 ++--
 .../standard/util/TestJdbcCommon.java           | 42 ++++++++++++++++++++
 .../standard/util/TestJdbcTypesDerby.java       |  4 --
 3 files changed, 47 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bd506b1e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 9cf9338..937dcab 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard.util;
 import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
 import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
 import static java.sql.Types.BLOB;
 import static java.sql.Types.BOOLEAN;
 import static java.sql.Types.CHAR;
@@ -83,7 +84,7 @@ public class JdbcCommon {
                     if (value == null) {
                         rec.put(i - 1, null);
 
-                    } else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) {
+                    } else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || javaSqlType == BLOB || javaSqlType == CLOB) {
                         // bytes requires little bit different handling
                         byte[] bytes = rs.getBytes(i);
                         ByteBuffer bb = ByteBuffer.wrap(bytes);
@@ -104,7 +105,7 @@ public class JdbcCommon {
                         // The different types that we support are numbers (int, long, double, float),
                         // as well as boolean values and Strings. Since Avro doesn't provide
                         // timestamp types, we want to convert those to Strings. So we will cast anything other
-                        // than numbers or booleans to strings by using to toString() method.
+                        // than numbers or booleans to strings by using the toString() method.
                         rec.put(i - 1, value.toString());
                     }
                 }
@@ -137,9 +138,10 @@ public class JdbcCommon {
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
+                case BIT:
                 case BOOLEAN:
                     builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
-                   break;
+                    break;
 
                 case INTEGER:
                 case SMALLINT:

http://git-wip-us.apache.org/repos/asf/nifi/blob/bd506b1e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
index f54d4ba..9c9532f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
@@ -24,19 +24,26 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestJdbcCommon {
 
@@ -138,6 +145,41 @@ public class TestJdbcCommon {
         }
     }
 
+
+    @Test
+    public void testCreateSchemaTypes() throws SQLException, IllegalArgumentException, IllegalAccessException {
+        final Set<Integer> fieldsToIgnore = new HashSet<>();
+        fieldsToIgnore.add(Types.NULL);
+        fieldsToIgnore.add(Types.OTHER);
+
+        final Field[] fieldTypes = Types.class.getFields();
+        for (final Field field : fieldTypes) {
+            final Object fieldObject = field.get(null);
+            final int type = (int) fieldObject;
+
+            if (fieldsToIgnore.contains(Types.NULL)) {
+                continue;
+            }
+
+            final ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
+            Mockito.when(metadata.getColumnCount()).thenReturn(1);
+            Mockito.when(metadata.getColumnType(1)).thenReturn(type);
+            Mockito.when(metadata.getColumnName(1)).thenReturn(field.getName());
+            Mockito.when(metadata.getTableName(1)).thenReturn("table");
+
+            final ResultSet rs = Mockito.mock(ResultSet.class);
+            Mockito.when(rs.getMetaData()).thenReturn(metadata);
+
+            try {
+                JdbcCommon.createSchema(rs);
+            } catch (final IllegalArgumentException | SQLException sqle) {
+                sqle.printStackTrace();
+                Assert.fail("Failed when using type " + field.getName());
+            }
+        }
+    }
+
+
     // many test use Derby as database, so ensure driver is available
     @Test
     public void testDriverLoad() throws ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/bd506b1e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
index cf3d0c6..fc2bccd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
@@ -64,10 +64,6 @@ public class TestJdbcTypesDerby {
             + "  active tinyint NOT NULL DEFAULT 0, "
             + "  home_module_id int DEFAULT NULL, "
             + "   PRIMARY KEY (id) ) " ;
-//            + "   UNIQUE email ) " ;
-//            + "   KEY home_module_id (home_module_id) ) " ;
-//            + "   CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES "
-//            + "  modules (id) ON DELETE SET NULL " ;
 
     String dropTable = "drop table users";