You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/23 16:13:58 UTC

[2/5] nifi git commit: NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1

NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1

Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ba3225fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ba3225fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ba3225fe

Branch: refs/heads/master
Commit: ba3225fe92258a6aca3cb706412ab62955914dc8
Parents: da28b81
Author: Toivo Adams <to...@gmail.com>
Authored: Thu Oct 1 17:22:08 2015 +0300
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Oct 23 09:28:03 2015 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   9 ++
 .../processors/standard/util/JdbcCommon.java    |  77 ++++++++--
 .../standard/util/TestJdbcTypesDerby.java       | 137 +++++++++++++++++
 .../standard/util/TestJdbcTypesH2.java          | 149 +++++++++++++++++++
 4 files changed, 357 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2d94981..b0b3afa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -190,6 +190,15 @@ language governing permissions and limitations under the License. -->
             <artifactId>derby</artifactId>
             <scope>test</scope>
         </dependency>
+        
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.4.187</version>
+            <scope>test</scope>
+        </dependency>
+              
+        
     </dependencies>
     
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 6fc69ff..de3d5d1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -16,15 +16,20 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BLOB;
 import static java.sql.Types.BOOLEAN;
 import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
 import static java.sql.Types.DATE;
 import static java.sql.Types.DECIMAL;
 import static java.sql.Types.DOUBLE;
 import static java.sql.Types.FLOAT;
 import static java.sql.Types.INTEGER;
 import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARBINARY;
 import static java.sql.Types.LONGVARCHAR;
 import static java.sql.Types.NCHAR;
 import static java.sql.Types.NUMERIC;
@@ -35,10 +40,12 @@ import static java.sql.Types.SMALLINT;
 import static java.sql.Types.TIME;
 import static java.sql.Types.TIMESTAMP;
 import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
 import static java.sql.Types.VARCHAR;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -70,17 +77,34 @@ public class JdbcCommon {
             long nrOfRows = 0;
             while (rs.next()) {
                 for (int i = 1; i <= nrOfColumns; i++) {
+                    final int javaSqlType = meta.getColumnType(i);
                     final Object value = rs.getObject(i);
 
-                    // The different types that we support are numbers (int, long, double, float),
-                    // as well as boolean values and Strings. Since Avro doesn't provide
-                    // timestamp types, we want to convert those to Strings. So we will cast anything other
-                    // than numbers or booleans to strings by using to toString() method.
                     if (value == null) {
                         rec.put(i - 1, null);
+
+                    } else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) {
+                        // bytes requires little bit different handling
+                        byte[] bytes = rs.getBytes(i);
+                        ByteBuffer bb = ByteBuffer.wrap(bytes);
+                        rec.put(i - 1, bb);
+
+                    } else if (value instanceof Byte) {
+                        // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
+                        // But value is returned by JDBC as java.lang.Byte
+                        // (at least H2 JDBC works this way)
+                        // direct put to avro record results:
+                        // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
+                        rec.put(i - 1, ((Byte) value).intValue());
+
                     } else if (value instanceof Number || value instanceof Boolean) {
                         rec.put(i - 1, value);
+
                     } else {
+                        // The different types that we support are numbers (int, long, double, float),
+                        // as well as boolean values and Strings. Since Avro doesn't provide
+                        // timestamp types, we want to convert those to Strings. So we will cast anything other
+                        // than numbers or booleans to strings by using to toString() method.
                         rec.put(i - 1, value.toString());
                     }
                 }
@@ -110,53 +134,76 @@ public class JdbcCommon {
                 case NCHAR:
                 case NVARCHAR:
                 case VARCHAR:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+
+
                     break;
 
                 case BOOLEAN:
-                    builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
-                    break;
+//                    builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
+                   break;
 
                 case INTEGER:
                 case SMALLINT:
                 case TINYINT:
-                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().intType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
                     break;
 
                 case BIGINT:
-                    builder.name(meta.getColumnName(i)).type().longType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
                     break;
 
                 // java.sql.RowId is interface, is seems to be database
                 // implementation specific, let's convert to String
                 case ROWID:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 case FLOAT:
                 case REAL:
-                    builder.name(meta.getColumnName(i)).type().floatType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
                     break;
 
                 case DOUBLE:
-                    builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DECIMAL:
                 case NUMERIC:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
                 // Did not find direct suitable type, need to be clarified!!!!
                 case DATE:
                 case TIME:
                 case TIMESTAMP:
-                    builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+//                    builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                     break;
 
-                default:
+                case BINARY:
+                case VARBINARY:
+                case LONGVARBINARY:
+                case ARRAY:
+                case BLOB:
+                case CLOB:
+                    builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
                     break;
+
+
+                default:
+                    throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
new file mode 100644
index 0000000..cf3d0c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *  Useless test, Derby is so much different from MySQL
+ * so it is impossible reproduce problems with MySQL.
+ *
+ *
+ */
+@Ignore
+public class TestJdbcTypesDerby {
+
+    final static String DB_LOCATION = "target/db";
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    String createTable = "create table users ("
+            + "  id int NOT NULL GENERATED ALWAYS AS IDENTITY, "
+            + "  email varchar(255) NOT NULL UNIQUE, "
+            + "  password varchar(255) DEFAULT NULL, "
+            + "  activation_code varchar(255) DEFAULT NULL, "
+            + "  forgotten_password_code varchar(255) DEFAULT NULL, "
+            + "  forgotten_password_time datetime DEFAULT NULL, "
+            + "  created datetime NOT NULL, "
+            + "  active tinyint NOT NULL DEFAULT 0, "
+            + "  home_module_id int DEFAULT NULL, "
+            + "   PRIMARY KEY (id) ) " ;
+//            + "   UNIQUE email ) " ;
+//            + "   KEY home_module_id (home_module_id) ) " ;
+//            + "   CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES "
+//            + "  modules (id) ON DELETE SET NULL " ;
+
+    String dropTable = "drop table users";
+
+    @Test
+    public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+       // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        final Connection con = createConnection();
+        final Statement st = con.createStatement();
+
+        try {
+            st.executeUpdate(dropTable);
+        } catch (final Exception e) {
+            // table may not exist, this is not serious problem.
+        }
+
+        st.executeUpdate(createTable);
+
+        st.executeUpdate("insert into users (email, password, activation_code, created, active) "
+                           + " values ('robert.gates@cold.com', '******', 'CAS', '2005-12-09', 'Y')");
+
+        final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+        final byte[] serializedBytes = outStream.toByteArray();
+        assertNotNull(serializedBytes);
+        System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+        st.close();
+        con.close();
+
+        // Deserialize bytes to records
+
+        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us from
+                // allocating and garbage collecting many objects for files with
+                // many items.
+                record = dataFileReader.next(record);
+                System.out.println(record);
+            }
+        }
+    }
+
+    // many test use Derby as database, so ensure driver is available
+    @Test
+    public void testDriverLoad() throws ClassNotFoundException {
+        final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        assertNotNull(clazz);
+    }
+
+    private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+        return con;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ba3225fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
new file mode 100644
index 0000000..e3041b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestJdbcTypesH2 {
+
+    final static String DB_LOCATION = "~/var/test/h2";
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    String createTable = "    CREATE TABLE `users` ( "
+            + "  `id` int(11) NOT NULL AUTO_INCREMENT, "
+            + "  `email` varchar(255) NOT NULL, "
+            + "  `password` varchar(255) DEFAULT NULL, "
+            + "  `activation_code` varchar(255) DEFAULT NULL, "
+            + "  `forgotten_password_code` varchar(255) DEFAULT NULL, "
+            + "  `forgotten_password_time` datetime DEFAULT NULL, "
+            + "  `created` datetime NOT NULL, "
+            + "  `active` tinyint(1) NOT NULL DEFAULT '0', "
+            + "  `home_module_id` int(11) DEFAULT NULL, "
+
+            + "  somebinary BINARY default null, "
+            + "  somebinary2 VARBINARY default null, "
+            + "  somebinary3 LONGVARBINARY default null, "
+            + "  somearray   ARRAY default null, "
+            + "  someblob BLOB default null, "
+            + "  someclob CLOB default null, "
+
+            + "  PRIMARY KEY (`id`), "
+            + "  UNIQUE KEY `email` (`email`) ) " ;
+//            + "  KEY `home_module_id` (`home_module_id`) )" ;
+/*            + "  CONSTRAINT `users_ibfk_1` FOREIGN KEY (`home_module_id`) REFERENCES "
+            + "`modules` (`id`) ON DELETE SET NULL "
+            + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 " ;
+  */
+
+    String dropTable = "drop table users";
+
+    @Test
+    public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
+       // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        final Connection con = createConnection();
+        final Statement st = con.createStatement();
+
+        try {
+            st.executeUpdate(dropTable);
+        } catch (final Exception e) {
+            // table may not exist, this is not serious problem.
+        }
+
+        st.executeUpdate(createTable);
+
+//        st.executeUpdate("insert into users (email, password, activation_code, forgotten_password_code, forgotten_password_time, created, active, home_module_id) "
+//                + " values ('robert.gates@cold.com', '******', 'CAS', 'ounou', '2005-12-09', '2005-12-03', 1, 5)");
+
+        st.executeUpdate("insert into users (email, password, activation_code, created, active, somebinary, somebinary2, somebinary3, someblob, someclob) "
+                + " values ('mari.gates@cold.com', '******', 'CAS', '2005-12-03', 3, '66FF', 'ABDF', 'EE64', 'BB22', 'CC88')");
+
+        final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
+//      final ResultSet resultSet = st.executeQuery("select U.active from users U");
+//        final ResultSet resultSet = st.executeQuery("select U.somebinary from users U");
+
+        final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+        final byte[] serializedBytes = outStream.toByteArray();
+        assertNotNull(serializedBytes);
+        System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
+
+        st.close();
+        con.close();
+
+        // Deserialize bytes to records
+
+        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us from
+                // allocating and garbage collecting many objects for files with
+                // many items.
+                record = dataFileReader.next(record);
+                System.out.println(record);
+            }
+        }
+    }
+
+    // verify H2 driver loading and get Connections works
+    @Test
+    public void testDriverLoad() throws ClassNotFoundException, SQLException {
+//        final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+        Connection con = createConnection();
+
+        assertNotNull(con);
+        con.close();
+    }
+
+    private Connection createConnection() throws ClassNotFoundException, SQLException {
+
+//        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        String connectionString = "jdbc:h2:file:" + DB_LOCATION + "/testdb7";
+        final Connection con = DriverManager.getConnection(connectionString, "SA", "");
+        return con;
+    }
+
+}