You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/05/04 19:58:50 UTC

[nifi] branch main updated: NIFI-6061: Fix CLOB/BLOB handling in PutDatabaseRecord

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 30fc266  NIFI-6061: Fix CLOB/BLOB handling in PutDatabaseRecord
30fc266 is described below

commit 30fc26647e06a9a38b95ac050722bf752bb392b0
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon May 3 12:39:49 2021 -0400

    NIFI-6061: Fix CLOB/BLOB handling in PutDatabaseRecord
    
    NIFI-6061: Force getBytes() in BLOB handling to use UTF-8 charset
    
    NIFI-6061: Use setClob(), added unit tests, incorporated review comments
    
    This closes #5049
    
    Co-authored-by: zhangcheng <zh...@foxmail.com>
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../serialization/record/util/DataTypeUtils.java   |   4 +
 .../serialization/record/TestDataTypeUtils.java    |   6 +
 .../processors/standard/PutDatabaseRecord.java     |  94 ++++++++++-
 .../standard/TestPutDatabaseRecord.groovy          | 175 ++++++++++++++++++++-
 4 files changed, 270 insertions(+), 9 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index eef9469..cd3cd4f 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1995,6 +1995,7 @@ public class DataTypeUtils {
             case Types.NVARCHAR:
             case Types.OTHER:
             case Types.SQLXML:
+            case Types.CLOB:
                 return RecordFieldType.STRING.getDataType();
             case Types.TIME:
                 return RecordFieldType.TIME.getDataType();
@@ -2002,6 +2003,9 @@ public class DataTypeUtils {
                 return RecordFieldType.TIMESTAMP.getDataType();
             case Types.ARRAY:
                 return RecordFieldType.ARRAY.getDataType();
+            case Types.BINARY:
+            case Types.BLOB:
+                return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
             case Types.STRUCT:
                 return RecordFieldType.RECORD.getDataType();
             default:
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index a493609..4745abe 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -363,6 +363,12 @@ public class TestDataTypeUtils {
     }
 
     @Test
+    public void testGetDataTypeFromSQLTypeValue() {
+        assertEquals(RecordFieldType.STRING.getDataType(), DataTypeUtils.getDataTypeFromSQLTypeValue(Types.CLOB));
+        assertEquals(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()), DataTypeUtils.getDataTypeFromSQLTypeValue(Types.BLOB));
+    }
+
+    @Test
     public void testChooseDataTypeWhenExpectedIsBigDecimal() {
         // GIVEN
         final List<DataType> dataTypes = Arrays.asList(
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 9b2251f..91407ea 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -59,9 +59,12 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.sql.BatchUpdateException;
+import java.sql.Clob;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.Date;
@@ -721,10 +724,31 @@ public class PutDatabaseRecord extends AbstractProcessor {
                             try {
                                 DataType targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType);
                                 if (targetDataType != null) {
-                                    currentValue = DataTypeUtils.convertType(
-                                            currentValue,
-                                            targetDataType,
-                                            fieldName);
+                                    if (sqlType == Types.BLOB || sqlType == Types.BINARY) {
+                                        if (currentValue instanceof Object[]) {
+                                            // Convert Object[Byte] arrays to byte[]
+                                            Object[] src = (Object[]) currentValue;
+                                            if (src.length > 0) {
+                                                if (!(src[0] instanceof Byte)) {
+                                                    throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY");
+                                                }
+                                            }
+                                            byte[] dest = new byte[src.length];
+                                            for (int j = 0; j < src.length; j++) {
+                                                dest[j] = (Byte) src[j];
+                                            }
+                                            currentValue = dest;
+                                        } else if (currentValue instanceof String) {
+                                            currentValue = ((String) currentValue).getBytes(StandardCharsets.UTF_8);
+                                        } else if (currentValue != null && !(currentValue instanceof byte[])) {
+                                            throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY");
+                                        }
+                                    } else {
+                                        currentValue = DataTypeUtils.convertType(
+                                                currentValue,
+                                                targetDataType,
+                                                fieldName);
+                                    }
                                 }
                             } catch (IllegalTypeConversionException itce) {
                                 // If the field and column types don't match or the value can't otherwise be converted to the column datatype,
@@ -740,15 +764,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                         // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
                         if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
-                            ps.setObject(i * 2 + 1, currentValue, sqlType);
-                            ps.setObject(i * 2 + 2, currentValue, sqlType);
+                            setParameter(ps, i * 2 + 1, currentValue, fieldSqlType, sqlType);
+                            setParameter(ps, i * 2 + 2, currentValue, fieldSqlType, sqlType);
                         } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
                             final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert();
                             for (int j = 0; j < timesToAddObjects; j++) {
-                                ps.setObject(i + (fieldIndexes.size() * j) + 1, currentValue, sqlType);
+                                setParameter(ps, i + (fieldIndexes.size() * j) + 1, currentValue, fieldSqlType, sqlType);
                             }
                         } else {
-                            ps.setObject(i + 1, currentValue, sqlType);
+                            setParameter(ps, i + 1, currentValue, fieldSqlType, sqlType);
                         }
                     }
 
@@ -776,6 +800,60 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
     }
 
+    private void setParameter(PreparedStatement ps, int index, Object value, int fieldSqlType, int sqlType) throws IOException {
+        if (sqlType == Types.BLOB) {
+            // Convert Byte[] or String (anything that has been converted to byte[]) into BLOB
+            if (fieldSqlType == Types.ARRAY || fieldSqlType == Types.VARCHAR) {
+                if (!(value instanceof byte[])) {
+                    if (value == null) {
+                        try {
+                            ps.setNull(index, Types.BLOB);
+                            return;
+                        } catch (SQLException e) {
+                            throw new IOException("Unable to setNull() on prepared statement" , e);
+                        }
+                    } else {
+                        throw new IOException("Expected BLOB to be of type byte[] but is instead " + value.getClass().getName());
+                    }
+                }
+                byte[] byteArray = (byte[]) value;
+                try (InputStream inputStream = new ByteArrayInputStream(byteArray)) {
+                    ps.setBlob(index, inputStream);
+                } catch (SQLException e) {
+                    throw new IOException("Unable to parse binary data " + value, e.getCause());
+                }
+            } else {
+                try (InputStream inputStream = new ByteArrayInputStream(value.toString().getBytes(StandardCharsets.UTF_8))) {
+                    ps.setBlob(index, inputStream);
+                } catch (IOException | SQLException e) {
+                    throw new IOException("Unable to parse binary data " + value, e.getCause());
+                }
+            }
+        } else if (sqlType == Types.CLOB) {
+            if (value == null) {
+                try {
+                    ps.setNull(index, Types.CLOB);
+                } catch (SQLException e) {
+                    throw new IOException("Unable to setNull() on prepared statement", e);
+                }
+            } else {
+                try {
+                    Clob clob = ps.getConnection().createClob();
+                    clob.setString(1, value.toString());
+                    ps.setClob(index, clob);
+                } catch (SQLException e) {
+                    throw new IOException("Unable to parse data as CLOB/String " + value, e.getCause());
+                }
+            }
+        } else {
+            try {
+                ps.setObject(index, value, sqlType);
+            } catch (SQLException e) {
+                throw new IOException("Unable to setObject() with value " + value + " at index " + index + " of type " + sqlType , e);
+            }
+        }
+    }
+
     private List<Record> getDataRecords(final Record outerRecord) {
         if (dataRecordPath == null) {
             return Collections.singletonList(outerRecord);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index e3c8e4c..d6eebec 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -38,6 +38,8 @@ import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.JUnit4
 
+import java.sql.Blob
+import java.sql.Clob
 import java.sql.Connection
 import java.sql.Date
 import java.sql.DriverManager
@@ -150,7 +152,6 @@ class TestPutDatabaseRecord {
                 false,
                 ['id'] as Set<String>,
                 ''
-
         ] as PutDatabaseRecord.TableSchema
 
         runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
@@ -1451,4 +1452,176 @@ class TestPutDatabaseRecord {
         stmt.close()
         conn.close()
     }
+
+    @Test
+    void testInsertWithBlobClob() throws Exception {
+        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
+                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"
+
+        recreateTable(createTableWithBlob)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        byte[] bytes = "BLOB".getBytes()
+        Byte[] blobRecordValue = new Byte[bytes.length]
+        (0 .. (bytes.length-1)).each { i -> blobRecordValue[i] = bytes[i].longValue() }
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+        parser.addSchemaField("content", RecordFieldType.ARRAY)
+
+        parser.addRecord(1, 'rec1', 101, blobRecordValue)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        Clob clob = rs.getClob(2)
+        assertNotNull(clob)
+        char[] clobText = new char[5]
+        int numBytes = clob.characterStream.read(clobText)
+        assertEquals(4, numBytes)
+        // Ignore last character, it's meant to ensure that only 4 bytes were read even though the buffer is 5 bytes
+        assertEquals('rec1', new String(clobText).substring(0,4))
+        Blob blob = rs.getBlob(3)
+        assertEquals("BLOB", new String(blob.getBytes(1, blob.length() as int)))
+        assertEquals(101, rs.getInt(4))
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
+    void testInsertWithBlobClobObjectArraySource() throws Exception {
+        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
+                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"
+
+        recreateTable(createTableWithBlob)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        byte[] bytes = "BLOB".getBytes()
+        Object[] blobRecordValue = new Object[bytes.length]
+        (0 .. (bytes.length-1)).each { i -> blobRecordValue[i] = bytes[i] }
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+        parser.addSchemaField("content", RecordFieldType.ARRAY)
+
+        parser.addRecord(1, 'rec1', 101, blobRecordValue)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        Clob clob = rs.getClob(2)
+        assertNotNull(clob)
+        char[] clobText = new char[5]
+        int numBytes = clob.characterStream.read(clobText)
+        assertEquals(4, numBytes)
+        // Ignore last character, it's meant to ensure that only 4 bytes were read even though the buffer is 5 bytes
+        assertEquals('rec1', new String(clobText).substring(0,4))
+        Blob blob = rs.getBlob(3)
+        assertEquals("BLOB", new String(blob.getBytes(1, blob.length() as int)))
+        assertEquals(101, rs.getInt(4))
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
+    void testInsertWithBlobStringSource() throws Exception {
+        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
+                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"
+
+        recreateTable(createTableWithBlob)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+        parser.addSchemaField("content", RecordFieldType.STRING)
+
+        parser.addRecord(1, 'rec1', 101, 'BLOB')
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        Clob clob = rs.getClob(2)
+        assertNotNull(clob)
+        char[] clobText = new char[5]
+        int numBytes = clob.characterStream.read(clobText)
+        assertEquals(4, numBytes)
+        // Ignore last character, it's meant to ensure that only 4 bytes were read even though the buffer is 5 bytes
+        assertEquals('rec1', new String(clobText).substring(0,4))
+        Blob blob = rs.getBlob(3)
+        assertEquals("BLOB", new String(blob.getBytes(1, blob.length() as int)))
+        assertEquals(101, rs.getInt(4))
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
+    void testInsertWithBlobIntegerArraySource() throws Exception {
+        String createTableWithBlob = "CREATE TABLE PERSONS (id integer primary key, name clob," +
+                "content blob, code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"
+
+        recreateTable(createTableWithBlob)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+        parser.addSchemaField("content", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()).getFieldType())
+
+        parser.addRecord(1, 'rec1', 101, [1,2,3] as Integer[])
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
+    }
 }