You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/03/12 19:40:11 UTC
[nifi] 19/21: NIFI-6062: Add support for BLOB, CLOB,
NCLOB in record handling
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.9.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 7d8243dc52e1a3aaea1cf3fb10eedd5102b9b5c9
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Feb 22 11:44:18 2019 -0500
NIFI-6062: Add support for BLOB, CLOB, NCLOB in record handling
This closes #3329
Signed-off-by: Mike Thomsen <mi...@gmail.com>
---
.../serialization/record/util/DataTypeUtils.java | 69 +++++++++++++++-
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 16 +++-
.../processors/standard/TestExecuteSQLRecord.java | 94 +++++++++++++++++++++-
3 files changed, 173 insertions(+), 6 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index d6e4878..a399f67 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -31,9 +31,13 @@ import org.apache.nifi.serialization.record.type.RecordDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.InputStream;
+import java.io.Reader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.sql.Blob;
+import java.sql.Clob;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
@@ -551,7 +555,28 @@ public class DataTypeUtils {
return list.toArray();
}
- throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
+ try {
+ if (value instanceof Blob) {
+ Blob blob = (Blob) value;
+ long rawBlobLength = blob.length();
+ if(rawBlobLength > Integer.MAX_VALUE) {
+ throw new IllegalTypeConversionException("Value of type " + value.getClass() + " too large to convert to Object Array for field " + fieldName);
+ }
+ int blobLength = (int) rawBlobLength;
+ byte[] src = blob.getBytes(1, blobLength);
+ Byte[] dest = new Byte[blobLength];
+ for (int i = 0; i < src.length; i++) {
+ dest[i] = src[i];
+ }
+ return dest;
+ } else {
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
+ }
+ } catch (IllegalTypeConversionException itce) {
+ throw itce;
+ } catch (Exception e) {
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName, e);
+ }
}
public static boolean isArrayTypeCompatible(final Object value, final DataType elementDataType) {
@@ -746,6 +771,20 @@ public class DataTypeUtils {
return ""; // Empty array = empty string
}
}
+ if (value instanceof Clob) {
+ Clob clob = (Clob) value;
+ StringBuilder sb = new StringBuilder();
+ char[] buffer = new char[32 * 1024]; // 32K default buffer
+ try (Reader reader = clob.getCharacterStream()) {
+ int charsRead;
+ while ((charsRead = reader.read(buffer)) != -1) {
+ sb.append(buffer, 0, charsRead);
+ }
+ return sb.toString();
+ } catch (Exception e) {
+ throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
+ }
+ }
return value.toString();
}
@@ -788,6 +827,34 @@ public class DataTypeUtils {
if (value instanceof java.util.Date) {
return getDateFormat(format).format((java.util.Date) value);
}
+ if (value instanceof Blob) {
+ Blob blob = (Blob) value;
+ StringBuilder sb = new StringBuilder();
+ byte[] buffer = new byte[32 * 1024]; // 32K default buffer
+ try (InputStream inStream = blob.getBinaryStream()) {
+ int bytesRead;
+ while ((bytesRead = inStream.read(buffer)) != -1) {
+ sb.append(new String(buffer, charset), 0, bytesRead);
+ }
+ return sb.toString();
+ } catch (Exception e) {
+ throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
+ }
+ }
+ if (value instanceof Clob) {
+ Clob clob = (Clob) value;
+ StringBuilder sb = new StringBuilder();
+ char[] buffer = new char[32 * 1024]; // 32K default buffer
+ try (Reader reader = clob.getCharacterStream()) {
+ int charsRead;
+ while ((charsRead = reader.read(buffer)) != -1) {
+ sb.append(buffer, 0, charsRead);
+ }
+ return sb.toString();
+ } catch (Exception e) {
+ throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + value.getClass() + " to a valid String", e);
+ }
+ }
if (value instanceof Object[]) {
return Arrays.toString((Object[]) value);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 4b13226..097844a 100755
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
+import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -55,6 +56,7 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.sql.Blob;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
@@ -726,8 +728,18 @@ public class AvroTypeUtil {
}
if (rawValue instanceof Object[]) {
return AvroTypeUtil.convertByteArray((Object[]) rawValue);
- } else {
- throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
+ }
+ try {
+ if (rawValue instanceof Blob) {
+ Blob blob = (Blob) rawValue;
+ return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream()));
+ } else {
+ throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
+ }
+ } catch (IllegalTypeConversionException itce) {
+ throw itce;
+ } catch (Exception e) {
+ throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e);
}
case MAP:
if (rawValue instanceof Record) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 3d172ca..6f6a091 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -24,6 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -35,8 +43,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -47,7 +57,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
@@ -102,7 +115,7 @@ public class TestExecuteSQLRecord {
@Before
public void setup() throws InitializationException {
- final DBCPService dbcp = new DBCPServiceSimpleImpl();
+ final DBCPService dbcp = new DBCPServiceSimpleImpl("derby");
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
@@ -340,6 +353,69 @@ public class TestExecuteSQLRecord {
}
@Test
+ public void testWriteLOBsToAvro() throws Exception {
+ final DBCPService dbcp = new DBCPServiceSimpleImpl("h2");
+ final Map<String, String> dbcpProperties = new HashMap<>();
+
+ runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
+
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, image blob(1K), words clob(1K), "
+ + "natwords nclob(1K), constraint my_pk primary key (id))");
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2, image, words, natwords) VALUES (0, NULL, 1, CAST (X'DEADBEEF' AS BLOB), "
+ + "CAST ('Hello World' AS CLOB), CAST ('I am an NCLOB' AS NCLOB))");
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+ AvroRecordSetWriter recordWriter = new AvroRecordSetWriter();
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(recordWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "1");
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(flowFile.toByteArray());
+ final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(bais, new GenericDatumReader<>());
+ final Schema avroSchema = dataFileStream.getSchema();
+ GenericData.setStringType(avroSchema, GenericData.StringType.String);
+ final GenericRecord avroRecord = dataFileStream.next();
+
+ Object imageObj = avroRecord.get("IMAGE");
+ assertNotNull(imageObj);
+ assertTrue(imageObj instanceof ByteBuffer);
+ assertArrayEquals(new byte[]{(byte) 0xDE, (byte) 0xAD, (byte) 0xBE, (byte) 0xEF}, ((ByteBuffer) imageObj).array());
+
+ Object wordsObj = avroRecord.get("WORDS");
+ assertNotNull(wordsObj);
+ assertTrue(wordsObj instanceof Utf8);
+ assertEquals("Hello World", wordsObj.toString());
+
+ Object natwordsObj = avroRecord.get("NATWORDS");
+ assertNotNull(natwordsObj);
+ assertTrue(natwordsObj instanceof Utf8);
+ assertEquals("I am an NCLOB", natwordsObj.toString());
+ }
+
+ @Test
public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
@@ -631,6 +707,13 @@ public class TestExecuteSQLRecord {
*/
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+ private final String type;
+
+ public DBCPServiceSimpleImpl(String type) {
+ this.type = type;
+
+ }
+
@Override
public String getIdentifier() {
return "dbcp";
@@ -639,8 +722,13 @@ public class TestExecuteSQLRecord {
@Override
public Connection getConnection() throws ProcessException {
try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+ final Connection con;
+ if ("h2".equalsIgnoreCase(type)) {
+ con = DriverManager.getConnection("jdbc:h2:file:" + "./target/testdb7");
+ } else {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+ }
return con;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);