You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/10/26 07:00:41 UTC
apex-malhar git commit: APEXMALHAR-2290 fix to optimize the function
which was populating meta data for columns
Repository: apex-malhar
Updated Branches:
refs/heads/master 87d8aef7c -> c3d3a880d
APEXMALHAR-2290 fix to optimize the function which was populating meta data for columns
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c3d3a880
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c3d3a880
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c3d3a880
Branch: refs/heads/master
Commit: c3d3a880d17181b628d28874545f5ba34effefd0
Parents: 87d8aef
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Wed Oct 12 20:30:22 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Wed Oct 26 12:09:02 2016 +0530
----------------------------------------------------------------------
.../db/jdbc/JdbcPOJOInsertOutputOperator.java | 71 +++++++++++++-------
1 file changed, 45 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3d3a880/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
index f5e6081..706757a 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
@@ -19,11 +19,11 @@
package com.datatorrent.lib.db.jdbc;
import java.lang.reflect.Field;
+import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.sql.Statement;
import java.sql.Types;
+import java.util.HashSet;
import java.util.List;
import org.slf4j.Logger;
@@ -60,23 +60,31 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
// Populate columnNames and columnDataTypes
try {
+ columnNames = Lists.newArrayList();
+ columnDataTypes = Lists.newArrayList();
+ columnNullabilities = Lists.newArrayList();
+ /**
+ * columnNamesSet is the set having column names given by the user
+ */
+ HashSet<String> columnNamesSet = new HashSet<>();
if (getFieldInfos() == null) { // then assume direct mapping
LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns");
- populateColumnDataTypes(null);
} else {
// FieldInfo supplied by user
StringBuilder columns = new StringBuilder();
StringBuilder values = new StringBuilder();
for (int i = 0; i < getFieldInfos().size(); i++) {
- columns.append(getFieldInfos().get(i).getColumnName());
+ String columnName = getFieldInfos().get(i).getColumnName();
+ columns.append(columnName);
values.append("?");
if (i < getFieldInfos().size() - 1) {
columns.append(",");
values.append(",");
}
+ columnNamesSet.add(columnName.toUpperCase());
}
- populateColumnDataTypes(columns.toString());
}
+ populateColumnDataTypes(columnNamesSet);
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -93,7 +101,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
String columnName = columnNames.get(i);
String pojoField = getMatchingField(fields, columnName);
- if (columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls &&
+ if (columnNullabilities.get(i) == DatabaseMetaData.columnNoNulls &&
(pojoField == null || pojoField.length() == 0)) {
throw new RuntimeException("Data for a non-nullable field: " + columnName + " not found in POJO");
} else {
@@ -146,28 +154,39 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
return null;
}
- protected void populateColumnDataTypes(String columns) throws SQLException
+ /**
+ * Function to populate Meta Data.
+ * @param columnNamesSet is a set having column names given by the user
+ * @throws SQLException
+ */
+ protected void populateColumnDataTypes(HashSet<String> columnNamesSet) throws SQLException
{
- columnNames = Lists.newArrayList();
- columnDataTypes = Lists.newArrayList();
- columnNullabilities = Lists.newArrayList();
-
- try (Statement st = store.getConnection().createStatement()) {
- if (columns == null || columns.length() == 0) {
- columns = "*";
+ ResultSet rsColumns;
+ DatabaseMetaData meta = store.getConnection().getMetaData();
+ /**Identifiers (table names, column names etc.) may be stored internally in either uppercase or lowercase.**/
+ rsColumns = meta.getColumns(null, null, getTablename().toUpperCase(), null);
+ if (!rsColumns.isBeforeFirst()) {
+ rsColumns = meta.getColumns(null, null, getTablename().toLowerCase(), null);
+ if (!rsColumns.isBeforeFirst()) {
+ /** If the table name is in quotes then some Databases store it without doing any uppercase or lowercase conversions */
+ rsColumns = meta.getColumns(null, null, getTablename(), null);
+ if (!rsColumns.isBeforeFirst()) {
+ throw new RuntimeException("Table name not found");
+ }
}
- ResultSet rs = st.executeQuery("select " + columns + " from " + getTablename());
-
- ResultSetMetaData rsMetaData = rs.getMetaData();
- LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
-
- for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
- int type = rsMetaData.getColumnType(i);
- String columnName = rsMetaData.getColumnName(i);
- columnNames.add(columnName);
- columnDataTypes.add(type);
- columnNullabilities.add(rsMetaData.isNullable(i));
- LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
+ }
+ boolean readAllColumns = columnNamesSet.size() == 0 ? true : false;
+ int remainingColumns = columnNamesSet.size();
+ while (rsColumns.next()) {
+ if (readAllColumns || remainingColumns > 0) {
+ if (readAllColumns || columnNamesSet.contains(rsColumns.getString("COLUMN_NAME").toUpperCase())) {
+ columnNames.add(rsColumns.getString("COLUMN_NAME"));
+ columnNullabilities.add(rsColumns.getInt("NULLABLE"));
+ columnDataTypes.add(rsColumns.getInt("DATA_TYPE"));
+ remainingColumns--;
+ }
+ } else {
+ break;
}
}
}