You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/10/26 07:00:41 UTC

apex-malhar git commit: APEXMALHAR-2290 fix to optimize the function which was populating meta data for columns

Repository: apex-malhar
Updated Branches:
  refs/heads/master 87d8aef7c -> c3d3a880d


APEXMALHAR-2290 fix to optimize the function which was populating meta data for columns


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c3d3a880
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c3d3a880
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c3d3a880

Branch: refs/heads/master
Commit: c3d3a880d17181b628d28874545f5ba34effefd0
Parents: 87d8aef
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Wed Oct 12 20:30:22 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Wed Oct 26 12:09:02 2016 +0530

----------------------------------------------------------------------
 .../db/jdbc/JdbcPOJOInsertOutputOperator.java   | 71 +++++++++++++-------
 1 file changed, 45 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3d3a880/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
index f5e6081..706757a 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
@@ -19,11 +19,11 @@
 package com.datatorrent.lib.db.jdbc;
 
 import java.lang.reflect.Field;
+import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.Types;
+import java.util.HashSet;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -60,23 +60,31 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
 
     // Populate columnNames and columnDataTypes
     try {
+      columnNames = Lists.newArrayList();
+      columnDataTypes = Lists.newArrayList();
+      columnNullabilities = Lists.newArrayList();
+      /**
+       * columnNamesSet is the set having column names given by the user
+       */
+      HashSet<String> columnNamesSet = new HashSet<>();
       if (getFieldInfos() == null) { // then assume direct mapping
         LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns");
-        populateColumnDataTypes(null);
       } else {
         // FieldInfo supplied by user
         StringBuilder columns = new StringBuilder();
         StringBuilder values = new StringBuilder();
         for (int i = 0; i < getFieldInfos().size(); i++) {
-          columns.append(getFieldInfos().get(i).getColumnName());
+          String columnName = getFieldInfos().get(i).getColumnName();
+          columns.append(columnName);
           values.append("?");
           if (i < getFieldInfos().size() - 1) {
             columns.append(",");
             values.append(",");
           }
+          columnNamesSet.add(columnName.toUpperCase());
         }
-        populateColumnDataTypes(columns.toString());
       }
+      populateColumnDataTypes(columnNamesSet);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -93,7 +101,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
         String columnName = columnNames.get(i);
         String pojoField = getMatchingField(fields, columnName);
 
-        if (columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls &&
+        if (columnNullabilities.get(i) == DatabaseMetaData.columnNoNulls &&
             (pojoField == null || pojoField.length() == 0)) {
           throw new RuntimeException("Data for a non-nullable field: " + columnName + " not found in POJO");
         } else {
@@ -146,28 +154,39 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
     return null;
   }
 
-  protected void populateColumnDataTypes(String columns) throws SQLException
+  /**
+   * Function to populate Meta Data.
+   * @param columnNamesSet is a set having column names given by the user
+   * @throws SQLException
+   */
+  protected void populateColumnDataTypes(HashSet<String> columnNamesSet) throws SQLException
   {
-    columnNames = Lists.newArrayList();
-    columnDataTypes = Lists.newArrayList();
-    columnNullabilities = Lists.newArrayList();
-
-    try (Statement st = store.getConnection().createStatement()) {
-      if (columns == null || columns.length() == 0) {
-        columns = "*";
+    ResultSet rsColumns;
+    DatabaseMetaData meta = store.getConnection().getMetaData();
+    /**Identifiers (table names, column names etc.) may be stored internally in either uppercase or lowercase.**/
+    rsColumns = meta.getColumns(null, null, getTablename().toUpperCase(), null);
+    if (!rsColumns.isBeforeFirst()) {
+      rsColumns = meta.getColumns(null, null, getTablename().toLowerCase(), null);
+      if (!rsColumns.isBeforeFirst()) {
+        /** If the table name is in quotes then some Databases store it without doing any uppercase or lowercase conversions */
+        rsColumns = meta.getColumns(null, null, getTablename(), null);
+        if (!rsColumns.isBeforeFirst()) {
+          throw new RuntimeException("Table name not found");
+        }
       }
-      ResultSet rs = st.executeQuery("select " + columns + " from " + getTablename());
-
-      ResultSetMetaData rsMetaData = rs.getMetaData();
-      LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
-
-      for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
-        int type = rsMetaData.getColumnType(i);
-        String columnName = rsMetaData.getColumnName(i);
-        columnNames.add(columnName);
-        columnDataTypes.add(type);
-        columnNullabilities.add(rsMetaData.isNullable(i));
-        LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
+    }
+    boolean readAllColumns = columnNamesSet.size() == 0 ? true : false;
+    int remainingColumns = columnNamesSet.size();
+    while (rsColumns.next()) {
+      if (readAllColumns || remainingColumns > 0) {
+        if (readAllColumns || columnNamesSet.contains(rsColumns.getString("COLUMN_NAME").toUpperCase())) {
+          columnNames.add(rsColumns.getString("COLUMN_NAME"));
+          columnNullabilities.add(rsColumns.getInt("NULLABLE"));
+          columnDataTypes.add(rsColumns.getInt("DATA_TYPE"));
+          remainingColumns--;
+        }
+      } else {
+        break;
       }
     }
   }