You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/03/12 19:40:11 UTC

[nifi] 19/21: NIFI-6062: Add support for BLOB, CLOB, NCLOB in record handling

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.9.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 7d8243dc52e1a3aaea1cf3fb10eedd5102b9b5c9
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Feb 22 11:44:18 2019 -0500

    NIFI-6062: Add support for BLOB, CLOB, NCLOB in record handling
    
    This closes #3329
    
    Signed-off-by: Mike Thomsen <mi...@gmail.com>
---
 .../serialization/record/util/DataTypeUtils.java   | 69 +++++++++++++++-
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    | 16 +++-
 .../processors/standard/TestExecuteSQLRecord.java  | 94 +++++++++++++++++++++-
 3 files changed, 173 insertions(+), 6 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index d6e4878..a399f67 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -31,9 +31,13 @@ import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
+import java.io.Reader;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.sql.Blob;
+import java.sql.Clob;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -551,7 +555,28 @@ public class DataTypeUtils {
             return list.toArray();
         }
 
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
+        try {
+            if (value instanceof Blob) {
+                Blob blob = (Blob) value;
+                long rawBlobLength = blob.length();
+                if(rawBlobLength > Integer.MAX_VALUE) {
+                    throw new IllegalTypeConversionException("Value of type " + value.getClass() + " too large to convert to Object Array for field " + fieldName);
+                }
+                int blobLength = (int) rawBlobLength;
+                byte[] src = blob.getBytes(1, blobLength);
+                Byte[] dest = new Byte[blobLength];
+                for (int i = 0; i < src.length; i++) {
+                    dest[i] = src[i];
+                }
+                return dest;
+            } else {
+                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
+            }
+        } catch (IllegalTypeConversionException itce) {
+            throw itce;
+        } catch (Exception e) {
+            throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName, e);
+        }
     }
 
     public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType) {
@@ -746,6 +771,20 @@ public class DataTypeUtils {
                 return ""; // Empty array = empty string
             }
         }
+        if (value instanceof Clob) {
+            Clob clob = (Clob) value;
+            StringBuilder sb = new StringBuilder();
+            char[] buffer = new char[32 * 1024]; // 32K default buffer
+            try (Reader reader = clob.getCharacterStream()) {
+                int charsRead;
+                while ((charsRead = reader.read(buffer)) != -1) {
+                    sb.append(buffer, 0, charsRead);
+                }
+                return sb.toString();
+            } catch (Exception e) {
+                throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
+            }
+        }
 
         return value.toString();
     }
@@ -788,6 +827,34 @@ public class DataTypeUtils {
         if (value instanceof java.util.Date) {
             return getDateFormat(format).format((java.util.Date) value);
         }
+        if (value instanceof Blob) {
+            Blob blob = (Blob) value;
+            StringBuilder sb = new StringBuilder();
+            byte[] buffer = new byte[32 * 1024]; // 32K default buffer
+            try (InputStream inStream = blob.getBinaryStream()) {
+                int bytesRead;
+                while ((bytesRead = inStream.read(buffer)) != -1) {
+                    sb.append(new String(buffer, charset), 0, bytesRead);
+                }
+                return sb.toString();
+            } catch (Exception e) {
+                throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
+            }
+        }
+        if (value instanceof Clob) {
+            Clob clob = (Clob) value;
+            StringBuilder sb = new StringBuilder();
+            char[] buffer = new char[32 * 1024]; // 32K default buffer
+            try (Reader reader = clob.getCharacterStream()) {
+                int charsRead;
+                while ((charsRead = reader.read(buffer)) != -1) {
+                    sb.append(buffer, 0, charsRead);
+                }
+                return sb.toString();
+            } catch (Exception e) {
+                throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
+            }
+        }
 
         if (value instanceof Object[]) {
             return Arrays.toString((Object[]) value);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 4b13226..097844a 100755
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -55,6 +56,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.sql.Blob;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.Duration;
@@ -726,8 +728,18 @@ public class AvroTypeUtil {
                 }
                 if (rawValue instanceof Object[]) {
                     return AvroTypeUtil.convertByteArray((Object[]) rawValue);
-                } else {
-                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
+                }
+                try {
+                    if (rawValue instanceof Blob) {
+                        Blob blob = (Blob) rawValue;
+                        return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream()));
+                    } else {
+                        throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
+                    }
+                } catch (IllegalTypeConversionException itce) {
+                    throw itce;
+                } catch (Exception e) {
+                    throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e);
                 }
             case MAP:
                 if (rawValue instanceof Record) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 3d172ca..6f6a091 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.avro.AvroRecordSetWriter;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -24,6 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -35,8 +43,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -47,7 +57,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -102,7 +115,7 @@ public class TestExecuteSQLRecord {
 
     @Before
     public void setup() throws InitializationException {
-        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final DBCPService dbcp = new DBCPServiceSimpleImpl("derby");
         final Map<String, String> dbcpProperties = new HashMap<>();
 
         runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
@@ -340,6 +353,69 @@ public class TestExecuteSQLRecord {
     }
 
     @Test
+    public void testWriteLOBsToAvro() throws Exception {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl("h2");
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, image blob(1K), words clob(1K), "
+                + "natwords nclob(1K), constraint my_pk primary key (id))");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2, image, words, natwords) VALUES (0, NULL, 1, CAST (X'DEADBEEF' AS BLOB), "
+                + "CAST ('Hello World' AS CLOB), CAST ('I am an NCLOB' AS NCLOB))");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        AvroRecordSetWriter recordWriter = new AvroRecordSetWriter();
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(recordWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "1");
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(flowFile.toByteArray());
+        final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(bais, new GenericDatumReader<>());
+        final Schema avroSchema = dataFileStream.getSchema();
+        GenericData.setStringType(avroSchema, GenericData.StringType.String);
+        final GenericRecord avroRecord = dataFileStream.next();
+
+        Object imageObj = avroRecord.get("IMAGE");
+        assertNotNull(imageObj);
+        assertTrue(imageObj instanceof ByteBuffer);
+        assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array());
+
+        Object wordsObj = avroRecord.get("WORDS");
+        assertNotNull(wordsObj);
+        assertTrue(wordsObj instanceof Utf8);
+        assertEquals("Hello World", wordsObj.toString());
+
+        Object natwordsObj = avroRecord.get("NATWORDS");
+        assertNotNull(natwordsObj);
+        assertTrue(natwordsObj instanceof Utf8);
+        assertEquals("I am an NCLOB", natwordsObj.toString());
+    }
+
+    @Test
     public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
@@ -631,6 +707,13 @@ public class TestExecuteSQLRecord {
      */
     class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
 
+        private final String type;
+
+        public DBCPServiceSimpleImpl(String type) {
+            this.type = type;
+
+        }
+
         @Override
         public String getIdentifier() {
             return "dbcp";
@@ -639,8 +722,13 @@ public class TestExecuteSQLRecord {
         @Override
         public Connection getConnection() throws ProcessException {
             try {
-                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-                final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+                final Connection con;
+                if ("h2".equalsIgnoreCase(type)) {
+                    con = DriverManager.getConnection("jdbc:h2:file:" + "./target/testdb7");
+                } else {
+                    Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                    con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+                }
                 return con;
             } catch (final Exception e) {
                 throw new ProcessException("getConnection failed: " + e);