You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/25 16:03:35 UTC
[35/41] nifi git commit: NIFI-972 ExecuteSQL bug in createSchema()
create Arvo Schema 1
NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1
Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ba3225fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ba3225fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ba3225fe
Branch: refs/heads/NIFI-810-InputRequirement
Commit: ba3225fe92258a6aca3cb706412ab62955914dc8
Parents: da28b81
Author: Toivo Adams <to...@gmail.com>
Authored: Thu Oct 1 17:22:08 2015 +0300
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:28:03 2015 -0400
----------------------------------------------------------------------
.../nifi-standard-processors/pom.xml | 9 ++
.../processors/standard/util/JdbcCommon.java | 77 ++++++++--
.../standard/util/TestJdbcTypesDerby.java | 137 +++++++++++++++++
.../standard/util/TestJdbcTypesH2.java | 149 +++++++++++++++++++
4 files changed, 357 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2d94981..b0b3afa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -190,6 +190,15 @@ language governing permissions and limitations under the License. -->
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>1.4.187</version>
+ <scope>test</scope>
+ </dependency>
+
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 6fc69ff..de3d5d1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -16,15 +16,20 @@
*/
package org.apache.nifi.processors.standard.util;
+import static java.sql.Types.ARRAY;
import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BLOB;
import static java.sql.Types.BOOLEAN;
import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
import static java.sql.Types.DATE;
import static java.sql.Types.DECIMAL;
import static java.sql.Types.DOUBLE;
import static java.sql.Types.FLOAT;
import static java.sql.Types.INTEGER;
import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NUMERIC;
@@ -35,10 +40,12 @@ import static java.sql.Types.SMALLINT;
import static java.sql.Types.TIME;
import static java.sql.Types.TIMESTAMP;
import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
import static java.sql.Types.VARCHAR;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -70,17 +77,34 @@ public class JdbcCommon {
long nrOfRows = 0;
while (rs.next()) {
for (int i = 1; i <= nrOfColumns; i++) {
+ final int javaSqlType = meta.getColumnType(i);
final Object value = rs.getObject(i);
- // The different types that we support are numbers (int, long, double, float),
- // as well as boolean values and Strings. Since Avro doesn't provide
- // timestamp types, we want to convert those to Strings. So we will cast anything other
- // than numbers or booleans to strings by using to toString() method.
if (value == null) {
rec.put(i - 1, null);
+
+ } else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) {
+ // bytes requires little bit different handling
+ byte[] bytes = rs.getBytes(i);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ rec.put(i - 1, bb);
+
+ } else if (value instanceof Byte) {
+ // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
+ // But value is returned by JDBC as java.lang.Byte
+ // (at least H2 JDBC works this way)
+ // direct put to avro record results:
+ // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
+ rec.put(i - 1, ((Byte) value).intValue());
+
} else if (value instanceof Number || value instanceof Boolean) {
rec.put(i - 1, value);
+
} else {
+ // The different types that we support are numbers (int, long, double, float),
+ // as well as boolean values and Strings. Since Avro doesn't provide
+ // timestamp types, we want to convert those to Strings. So we will cast anything other
+ // than numbers or booleans to strings by using to toString() method.
rec.put(i - 1, value.toString());
}
}
@@ -110,53 +134,76 @@ public class JdbcCommon {
case NCHAR:
case NVARCHAR:
case VARCHAR:
- builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+// builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+// builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+
+
break;
case BOOLEAN:
- builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
- break;
+// builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
+ break;
case INTEGER:
case SMALLINT:
case TINYINT:
- builder.name(meta.getColumnName(i)).type().intType().noDefault();
+// builder.name(meta.getColumnName(i)).type().intType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
break;
case BIGINT:
- builder.name(meta.getColumnName(i)).type().longType().noDefault();
+// builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
break;
// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
- builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case FLOAT:
case REAL:
- builder.name(meta.getColumnName(i)).type().floatType().noDefault();
+// builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
break;
case DOUBLE:
- builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
+// builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
case DECIMAL:
case NUMERIC:
- builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
case DATE:
case TIME:
case TIMESTAMP:
- builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
- default:
+ case BINARY:
+ case VARBINARY:
+ case LONGVARBINARY:
+ case ARRAY:
+ case BLOB:
+ case CLOB:
+ builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
break;
+
+
+ default:
+ throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type");
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
new file mode 100644
index 0000000..cf3d0c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Useless test, Derby is so much different from MySQL
+ * so it is impossible reproduce problems with MySQL.
+ *
+ *
+ */
+@Ignore
+public class TestJdbcTypesDerby {
+
+ final static String DB_LOCATION = "target/db";
+
+ @BeforeClass
+ public static void setup() {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+ }
+
+ String createTable = "create table users ("
+ + " id int NOT NULL GENERATED ALWAYS AS IDENTITY, "
+ + " email varchar(255) NOT NULL UNIQUE, "
+ + " password varchar(255) DEFAULT NULL, "
+ + " activation_code varchar(255) DEFAULT NULL, "
+ + " forgotten_password_code varchar(255) DEFAULT NULL, "
+ + " forgotten_password_time datetime DEFAULT NULL, "
+ + " created datetime NOT NULL, "
+ + " active tinyint NOT NULL DEFAULT 0, "
+ + " home_module_id int DEFAULT NULL, "
+ + " PRIMARY KEY (id) ) " ;
+// + " UNIQUE email ) " ;
+// + " KEY home_module_id (home_module_id) ) " ;
+// + " CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES "
+// + " modules (id) ON DELETE SET NULL " ;
+
+ String dropTable = "drop table users";
+
+ @Test
+ public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ final Connection con = createConnection();
+ final Statement st = con.createStatement();
+
+ try {
+ st.executeUpdate(dropTable);
+ } catch (final Exception e) {
+ // table may not exist, this is not serious problem.
+ }
+
+ st.executeUpdate(createTable);
+
+ st.executeUpdate("insert into users (email, password, activation_code, created, active) "
+ + " values ('robert.gates@cold.com', '******', 'CAS', '2005-12-09', 'Y')");
+
+ final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+
+ final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+ final byte[] serializedBytes = outStream.toByteArray();
+ assertNotNull(serializedBytes);
+ System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+ st.close();
+ con.close();
+
+ // Deserialize bytes to records
+
+ final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+ final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+ try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+ GenericRecord record = null;
+ while (dataFileReader.hasNext()) {
+ // Reuse record object by passing it to next(). This saves us from
+ // allocating and garbage collecting many objects for files with
+ // many items.
+ record = dataFileReader.next(record);
+ System.out.println(record);
+ }
+ }
+ }
+
+ // many test use Derby as database, so ensure driver is available
+ @Test
+ public void testDriverLoad() throws ClassNotFoundException {
+ final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ assertNotNull(clazz);
+ }
+
+ private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+ return con;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
new file mode 100644
index 0000000..e3041b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestJdbcTypesH2 {
+
+ final static String DB_LOCATION = "~/var/test/h2";
+
+ @BeforeClass
+ public static void setup() {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+ }
+
+ String createTable = " CREATE TABLE `users` ( "
+ + " `id` int(11) NOT NULL AUTO_INCREMENT, "
+ + " `email` varchar(255) NOT NULL, "
+ + " `password` varchar(255) DEFAULT NULL, "
+ + " `activation_code` varchar(255) DEFAULT NULL, "
+ + " `forgotten_password_code` varchar(255) DEFAULT NULL, "
+ + " `forgotten_password_time` datetime DEFAULT NULL, "
+ + " `created` datetime NOT NULL, "
+ + " `active` tinyint(1) NOT NULL DEFAULT '0', "
+ + " `home_module_id` int(11) DEFAULT NULL, "
+
+ + " somebinary BINARY default null, "
+ + " somebinary2 VARBINARY default null, "
+ + " somebinary3 LONGVARBINARY default null, "
+ + " somearray ARRAY default null, "
+ + " someblob BLOB default null, "
+ + " someclob CLOB default null, "
+
+ + " PRIMARY KEY (`id`), "
+ + " UNIQUE KEY `email` (`email`) ) " ;
+// + " KEY `home_module_id` (`home_module_id`) )" ;
+/* + " CONSTRAINT `users_ibfk_1` FOREIGN KEY (`home_module_id`) REFERENCES "
+ + "`modules` (`id`) ON DELETE SET NULL "
+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 " ;
+ */
+
+ String dropTable = "drop table users";
+
+ @Test
+ public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ final Connection con = createConnection();
+ final Statement st = con.createStatement();
+
+ try {
+ st.executeUpdate(dropTable);
+ } catch (final Exception e) {
+ // table may not exist, this is not serious problem.
+ }
+
+ st.executeUpdate(createTable);
+
+// st.executeUpdate("insert into users (email, password, activation_code, forgotten_password_code, forgotten_password_time, created, active, home_module_id) "
+// + " values ('robert.gates@cold.com', '******', 'CAS', 'ounou', '2005-12-09', '2005-12-03', 1, 5)");
+
+ st.executeUpdate("insert into users (email, password, activation_code, created, active, somebinary, somebinary2, somebinary3, someblob, someclob) "
+ + " values ('mari.gates@cold.com', '******', 'CAS', '2005-12-03', 3, '66FF', 'ABDF', 'EE64', 'BB22', 'CC88')");
+
+ final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+// final ResultSet resultSet = st.executeQuery("select U.active from users U");
+// final ResultSet resultSet = st.executeQuery("select U.somebinary from users U");
+
+ final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+ final byte[] serializedBytes = outStream.toByteArray();
+ assertNotNull(serializedBytes);
+ System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+ st.close();
+ con.close();
+
+ // Deserialize bytes to records
+
+ final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+ final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+ try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+ GenericRecord record = null;
+ while (dataFileReader.hasNext()) {
+ // Reuse record object by passing it to next(). This saves us from
+ // allocating and garbage collecting many objects for files with
+ // many items.
+ record = dataFileReader.next(record);
+ System.out.println(record);
+ }
+ }
+ }
+
+ // verify H2 driver loading and get Connections works
+ @Test
+ public void testDriverLoad() throws ClassNotFoundException, SQLException {
+// final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+ Connection con = createConnection();
+
+ assertNotNull(con);
+ con.close();
+ }
+
+ private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+// Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ String connectionString = "jdbc:h2:file:" + DB_LOCATION + "/testdb7";
+ final Connection con = DriverManager.getConnection(connectionString, "SA", "");
+ return con;
+ }
+
+}