You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/03/19 19:40:43 UTC

[geode] branch develop updated: GEODE-4833: JdbcWriter and JdbcAsyncWriter may fail to write null fields to database (#1636)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b4ad610  GEODE-4833: JdbcWriter and JdbcAsyncWriter may fail to write null fields to database (#1636)
b4ad610 is described below

commit b4ad610f1bd20b7b87e9679b4637893e2bf3c5b8
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Mon Mar 19 12:40:40 2018 -0700

    GEODE-4833: JdbcWriter and JdbcAsyncWriter may fail to write null fields to database (#1636)
    
    Null values are now supported. Meta-data read from the database is now used to determine the
    column type and used to call setNull.
    
    * Renamed TableKeyColumnManager to TableMetaDataManager
    * registerPdxMetaData will now throw an exception if the instance is not serialized with PDX
---
 .../jdbc/internal/AbstractJdbcCallback.java        |   4 +-
 .../connectors/jdbc/internal/ColumnValue.java      |  13 +-
 .../geode/connectors/jdbc/internal/SqlHandler.java |  31 ++-
 .../connectors/jdbc/internal/TableMetaData.java    |  54 +++++
 ...olumnManager.java => TableMetaDataManager.java} |  52 +++--
 .../jdbc/internal/TableMetaDataView.java           |  23 +++
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |   5 +-
 .../geode/connectors/jdbc/JdbcDUnitTest.java       |  81 +++++++-
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |   4 +-
 .../connectors/jdbc/JdbcWriterIntegrationTest.java |   4 +-
 .../connectors/jdbc/internal/ColumnValueTest.java  |  12 +-
 .../connectors/jdbc/internal/SqlHandlerTest.java   |  50 ++++-
 .../jdbc/internal/SqlStatementFactoryTest.java     |  10 +-
 .../jdbc/internal/TableKeyColumnManagerTest.java   | 141 -------------
 .../TableMetaDataManagerIntegrationTest.java       | 106 ++++++++++
 .../jdbc/internal/TableMetaDataManagerTest.java    | 226 +++++++++++++++++++++
 .../java/org/apache/geode/cache/GemFireCache.java  |   6 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |   6 +-
 .../geode/internal/cache/GemFireCacheImplTest.java |  33 +++
 19 files changed, 662 insertions(+), 199 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
index b3baabe..3fbf8c0 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -59,9 +59,9 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
     if (sqlHandler == null) {
       this.cache = cache;
       JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
-      TableKeyColumnManager tableKeyColumnManager = new TableKeyColumnManager();
+      TableMetaDataManager tableMetaDataManager = new TableMetaDataManager();
       DataSourceManager manager = new DataSourceManager(new HikariJdbcDataSourceFactory());
-      sqlHandler = new SqlHandler(manager, tableKeyColumnManager, service);
+      sqlHandler = new SqlHandler(manager, tableMetaDataManager, service);
     }
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
index c3f44d0..581b975 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
@@ -14,15 +14,19 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
+import java.sql.JDBCType;
+
 class ColumnValue {
   private final boolean isKey;
   private final String columnName;
   private final Object value;
+  private final int dataType;
 
-  ColumnValue(boolean isKey, String columnName, Object value) {
+  ColumnValue(boolean isKey, String columnName, Object value, int dataType) {
     this.isKey = isKey;
     this.columnName = columnName;
     this.value = value;
+    this.dataType = dataType;
   }
 
   boolean isKey() {
@@ -37,8 +41,13 @@ class ColumnValue {
     return value;
   }
 
+  int getDataType() {
+    return dataType;
+  }
+
   @Override
   public String toString() {
-    return "ColumnValue [isKey=" + isKey + ", columnName=" + columnName + ", value=" + value + "]";
+    return "ColumnValue [isKey=" + isKey + ", columnName=" + columnName + ", value=" + value
+        + ", dataType=" + JDBCType.valueOf(dataType) + "]";
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index d45815a..e277254 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -34,12 +34,12 @@ import org.apache.geode.pdx.PdxInstance;
 public class SqlHandler {
   private final JdbcConnectorService configService;
   private final DataSourceManager manager;
-  private final TableKeyColumnManager tableKeyColumnManager;
+  private final TableMetaDataManager tableMetaDataManager;
 
-  public SqlHandler(DataSourceManager manager, TableKeyColumnManager tableKeyColumnManager,
+  public SqlHandler(DataSourceManager manager, TableMetaDataManager tableMetaDataManager,
       JdbcConnectorService configService) {
     this.manager = manager;
-    this.tableKeyColumnManager = tableKeyColumnManager;
+    this.tableMetaDataManager = tableMetaDataManager;
     this.configService = configService;
   }
 
@@ -105,7 +105,7 @@ public class SqlHandler {
   }
 
   private String getKeyColumnName(Connection connection, String tableName) {
-    return this.tableKeyColumnManager.getKeyColumnName(connection, tableName);
+    return this.tableMetaDataManager.getTableMetaDataView(connection, tableName).getKeyColumnName();
   }
 
   private void setValuesInStatement(PreparedStatement statement, List<ColumnValue> columnList)
@@ -117,7 +117,11 @@ public class SqlHandler {
       if (value instanceof Character) {
         value = ((Character) value).toString();
       }
-      statement.setObject(index, value);
+      if (value == null) {
+        statement.setNull(index, columnValue.getDataType());
+      } else {
+        statement.setObject(index, value);
+      }
     }
   }
 
@@ -196,27 +200,32 @@ public class SqlHandler {
   <K> List<ColumnValue> getColumnToValueList(Connection connection, RegionMapping regionMapping,
       K key, PdxInstance value, Operation operation) {
     String tableName = regionMapping.getRegionToTableName();
-    String keyColumnName = getKeyColumnName(connection, tableName);
-    ColumnValue keyColumnValue = new ColumnValue(true, keyColumnName, key);
+    TableMetaDataView tableMetaData =
+        this.tableMetaDataManager.getTableMetaDataView(connection, tableName);
+    String keyColumnName = tableMetaData.getKeyColumnName();
+    ColumnValue keyColumnValue =
+        new ColumnValue(true, keyColumnName, key, tableMetaData.getColumnDataType(keyColumnName));
 
     if (operation.isDestroy() || operation.isGet()) {
       return Collections.singletonList(keyColumnValue);
     }
 
-    List<ColumnValue> result = createColumnValueList(regionMapping, value, keyColumnName);
+    List<ColumnValue> result =
+        createColumnValueList(tableMetaData, regionMapping, value, keyColumnName);
     result.add(keyColumnValue);
     return result;
   }
 
-  private List<ColumnValue> createColumnValueList(RegionMapping regionMapping, PdxInstance value,
-      String keyColumnName) {
+  private List<ColumnValue> createColumnValueList(TableMetaDataView tableMetaData,
+      RegionMapping regionMapping, PdxInstance value, String keyColumnName) {
     List<ColumnValue> result = new ArrayList<>();
     for (String fieldName : value.getFieldNames()) {
       String columnName = regionMapping.getColumnNameForField(fieldName);
       if (columnName.equalsIgnoreCase(keyColumnName)) {
         continue;
       }
-      ColumnValue columnValue = new ColumnValue(false, columnName, value.getField(fieldName));
+      ColumnValue columnValue = new ColumnValue(false, columnName, value.getField(fieldName),
+          tableMetaData.getColumnDataType(columnName));
       result.add(columnValue);
     }
     return result;
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
new file mode 100644
index 0000000..12e7682
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.HashMap;
+
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+
+public class TableMetaData implements TableMetaDataView {
+
+  private final String keyColumnName;
+  private final HashMap<String, Integer> columnNameToTypeMap = new HashMap<>();
+
+  public TableMetaData(String keyColumnName) {
+    this.keyColumnName = keyColumnName;
+  }
+
+  @Override
+  public String getKeyColumnName() {
+    return this.keyColumnName;
+  }
+
+  @Override
+  public int getColumnDataType(String columnName) {
+    Integer dataType = this.columnNameToTypeMap.get(columnName.toLowerCase());
+    if (dataType == null) {
+      return 0;
+    }
+    return dataType;
+  }
+
+  public void addDataType(String columnName, int dataType) {
+    Integer previousDataType = this.columnNameToTypeMap.put(columnName.toLowerCase(), dataType);
+    if (previousDataType != null) {
+      throw new JdbcConnectorException(
+          "Column names must be different in case. Two columns both have the name "
+              + columnName.toLowerCase());
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
similarity index 62%
rename from geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManager.java
rename to geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
index b7d972d..4d9a46f 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
@@ -30,26 +30,29 @@ import org.apache.geode.connectors.jdbc.JdbcConnectorException;
  * than one column as a primary key or no columns then an exception is thrown. The computation is
  * remembered so that it does not need to be recomputed for the same table name.
  */
-public class TableKeyColumnManager {
-  private final ConcurrentMap<String, String> tableToPrimaryKeyMap = new ConcurrentHashMap<>();
+public class TableMetaDataManager {
+  private final ConcurrentMap<String, TableMetaDataView> tableToMetaDataMap =
+      new ConcurrentHashMap<>();
 
-  public String getKeyColumnName(Connection connection, String tableName) {
-    return tableToPrimaryKeyMap.computeIfAbsent(tableName,
-        k -> computeKeyColumnName(connection, k));
+  public TableMetaDataView getTableMetaDataView(Connection connection, String tableName) {
+    return tableToMetaDataMap.computeIfAbsent(tableName,
+        k -> computeTableMetaDataView(connection, k));
   }
 
-  private String computeKeyColumnName(Connection connection, String tableName) {
-    String key;
+  private TableMetaDataView computeTableMetaDataView(Connection connection, String tableName) {
+    TableMetaData result;
     try {
       DatabaseMetaData metaData = connection.getMetaData();
       try (ResultSet tables = metaData.getTables(null, null, "%", null)) {
         String realTableName = getTableNameFromMetaData(tableName, tables);
-        key = getPrimaryKeyColumnNameFromMetaData(realTableName, metaData);
+        String key = getPrimaryKeyColumnNameFromMetaData(realTableName, metaData);
+        result = new TableMetaData(key);
+        getDataTypesFromMetaData(realTableName, metaData, result);
       }
     } catch (SQLException e) {
       throw JdbcConnectorException.createException(e);
     }
-    return key;
+    return result;
   }
 
   private String getTableNameFromMetaData(String tableName, ResultSet tables) throws SQLException {
@@ -72,17 +75,28 @@ public class TableKeyColumnManager {
 
   private String getPrimaryKeyColumnNameFromMetaData(String tableName, DatabaseMetaData metaData)
       throws SQLException {
-    ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, tableName);
-    if (!primaryKeys.next()) {
-      throw new JdbcConnectorException(
-          "The table " + tableName + " does not have a primary key column.");
-    }
-    String key = primaryKeys.getString("COLUMN_NAME");
-    if (primaryKeys.next()) {
-      throw new JdbcConnectorException(
-          "The table " + tableName + " has more than one primary key column.");
+    try (ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, tableName)) {
+      if (!primaryKeys.next()) {
+        throw new JdbcConnectorException(
+            "The table " + tableName + " does not have a primary key column.");
+      }
+      String key = primaryKeys.getString("COLUMN_NAME");
+      if (primaryKeys.next()) {
+        throw new JdbcConnectorException(
+            "The table " + tableName + " has more than one primary key column.");
+      }
+      return key;
     }
-    return key;
   }
 
+  private void getDataTypesFromMetaData(String tableName, DatabaseMetaData metaData,
+      TableMetaData result) throws SQLException {
+    try (ResultSet columnData = metaData.getColumns(null, null, tableName, "%")) {
+      while (columnData.next()) {
+        String columnName = columnData.getString("COLUMN_NAME");
+        int dataType = columnData.getInt("DATA_TYPE");
+        result.addDataType(columnName, dataType);
+      }
+    }
+  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
new file mode 100644
index 0000000..c78c8f7
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+public interface TableMetaDataView {
+  public String getKeyColumnName();
+
+  public int getColumnDataType(String columnName);
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 9da3033..c1adcb9 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -30,14 +30,13 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfigExistsException;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
-import org.apache.geode.connectors.jdbc.internal.TableKeyColumnManager;
+import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
 import org.apache.geode.internal.cache.InternalCache;
@@ -241,7 +240,7 @@ public class JdbcAsyncWriterIntegrationTest {
 
   private SqlHandler createSqlHandler()
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    return new SqlHandler(new TestableConnectionManager(), new TableKeyColumnManager(),
+    return new SqlHandler(new TestableConnectionManager(), new TableMetaDataManager(),
         TestConfigService.getTestConfigService());
   }
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
index ed86c82..7ae1488 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
@@ -24,6 +24,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.Date;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -104,16 +105,39 @@ public class JdbcDUnitTest implements Serializable {
       Statement statement = connection.createStatement();
       statement.execute("Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
           + "aboolean smallint, " + "abyte smallint, " + "ashort smallint, " + "anint int, "
-          + "along bigint, " + "afloat float, " + "adouble float, " + "astring varchar(10), "
+          + "along bigint, " + "afloat float, " + "adouble double, " + "astring varchar(10), "
           + "adate timestamp, " + "anobject varchar(20), " + "abytearray blob(100), "
           + "achar char(1))");
     });
   }
 
-  private void insertDataForAllSupportedFieldsTable(String key,
-      ClassWithSupportedPdxFields classWithSupportedPdxFields) {
+  private void insertNullDataForAllSupportedFieldsTable(String key) {
+    server.invoke(() -> {
+      Connection connection = DriverManager.getConnection(CONNECTION_URL);
+
+      String insertQuery = "Insert into " + TABLE_NAME + " values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)";
+      System.out.println("### Query is :" + insertQuery);
+      PreparedStatement statement = connection.prepareStatement(insertQuery);
+      statement.setObject(1, key);
+      statement.setNull(2, Types.SMALLINT);
+      statement.setNull(3, Types.SMALLINT);
+      statement.setNull(4, Types.SMALLINT);
+      statement.setNull(5, Types.INTEGER);
+      statement.setNull(6, Types.BIGINT);
+      statement.setNull(7, Types.FLOAT);
+      statement.setNull(8, Types.DOUBLE);
+      statement.setNull(9, Types.VARCHAR);
+      statement.setNull(10, Types.TIMESTAMP);
+      statement.setNull(11, Types.VARCHAR);
+      statement.setNull(12, Types.BLOB);
+      statement.setNull(13, Types.CHAR);
+
+      statement.execute();
+    });
+  }
+
+  private void insertDataForAllSupportedFieldsTable(String key, ClassWithSupportedPdxFields data) {
     server.invoke(() -> {
-      ClassWithSupportedPdxFields data = classWithSupportedPdxFields;
       Connection connection = DriverManager.getConnection(CONNECTION_URL);
 
       String insertQuery = "Insert into " + TABLE_NAME + " values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)";
@@ -385,6 +409,28 @@ public class JdbcDUnitTest implements Serializable {
   }
 
   @Test
+  public void clientPutsAndGetsWithNullFieldsWithPdxClassName() throws Exception {
+    createTableForAllSupportedFields();
+    ClientVM client = getClientVM();
+    createClientRegion(client);
+
+    createRegionUsingGfsh(true, false, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME, ClassWithSupportedPdxFields.class.getName(), false);
+    client.invoke(() -> {
+      String key = "id1";
+      ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields();
+      Region<String, ClassWithSupportedPdxFields> region =
+          ClusterStartupRule.getClientCache().getRegion(REGION_NAME);
+      region.put(key, value);
+      region.invalidate(key);
+
+      ClassWithSupportedPdxFields result = region.get(key);
+      assertThat(result).isEqualTo(value);
+    });
+  }
+
+  @Test
   public void clientRegistersPdxAndReadsFromDBWithPdxClassName() throws Exception {
     createTableForAllSupportedFields();
     ClientVM client = getClientVM();
@@ -411,6 +457,33 @@ public class JdbcDUnitTest implements Serializable {
     });
   }
 
+  @Test
+  public void clientRegistersPdxAndReadsFromDBContainingNullColumnsWithPdxClassName()
+      throws Exception {
+    createTableForAllSupportedFields();
+    ClientVM client = getClientVM();
+    createClientRegion(client);
+    createRegionUsingGfsh(true, false, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME, ClassWithSupportedPdxFields.class.getName(), false);
+    String key = "id1";
+    ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields();
+
+    server.invoke(() -> {
+      insertNullDataForAllSupportedFieldsTable(key);
+    });
+
+    client.invoke(() -> {
+      ClusterStartupRule.getClientCache().registerPdxMetaData(new ClassWithSupportedPdxFields());
+
+      Region<String, ClassWithSupportedPdxFields> region =
+          ClusterStartupRule.getClientCache().getRegion(REGION_NAME);
+
+      ClassWithSupportedPdxFields result = region.get(key);
+      assertThat(result).isEqualTo(value);
+    });
+  }
+
   private ClientVM getClientVM() throws Exception {
     Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> {
       System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true");
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index bbdb571..dc92514 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -37,7 +37,7 @@ import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfigExistsException;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
-import org.apache.geode.connectors.jdbc.internal.TableKeyColumnManager;
+import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
 import org.apache.geode.internal.cache.InternalCache;
@@ -180,7 +180,7 @@ public class JdbcLoaderIntegrationTest {
 
   private SqlHandler createSqlHandler(String pdxClassName, boolean primaryKeyInValue)
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    return new SqlHandler(new TestableConnectionManager(), new TableKeyColumnManager(),
+    return new SqlHandler(new TestableConnectionManager(), new TableMetaDataManager(),
         TestConfigService.getTestConfigService((InternalCache) cache, pdxClassName,
             primaryKeyInValue));
   }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
index e4cbb16..5e42631 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
@@ -38,7 +38,7 @@ import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfigExistsException;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
-import org.apache.geode.connectors.jdbc.internal.TableKeyColumnManager;
+import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
 import org.apache.geode.internal.cache.InternalCache;
@@ -226,7 +226,7 @@ public class JdbcWriterIntegrationTest {
 
   private SqlHandler createSqlHandler()
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    return new SqlHandler(new TestableConnectionManager(), new TableKeyColumnManager(),
+    return new SqlHandler(new TestableConnectionManager(), new TableMetaDataManager(),
         TestConfigService.getTestConfigService());
   }
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
index 2cdde6b..c25dd25 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
@@ -16,6 +16,8 @@ package org.apache.geode.connectors.jdbc.internal;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.sql.JDBCType;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -27,12 +29,13 @@ public class ColumnValueTest {
 
   private static final String COLUMN_NAME = "columnName";
   private static final Object VALUE = new Object();
+  private static final JDBCType DATA_TYPE = JDBCType.VARCHAR;
 
   private ColumnValue value;
 
   @Before
   public void setup() {
-    value = new ColumnValue(true, COLUMN_NAME, VALUE);
+    value = new ColumnValue(true, COLUMN_NAME, VALUE, DATA_TYPE.getVendorTypeNumber());
   }
 
   @Test
@@ -51,8 +54,13 @@ public class ColumnValueTest {
   }
 
   @Test
+  public void hasCorrectDataType() {
+    assertThat(value.getDataType()).isSameAs(DATA_TYPE.getVendorTypeNumber());
+  }
+
+  @Test
   public void toStringContainsAllVariables() {
     assertThat(value.toString()).contains(Boolean.toString(true)).contains(COLUMN_NAME)
-        .contains(VALUE.toString());
+        .contains(VALUE.toString()).contains(DATA_TYPE.toString());
   }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index f47be9d..efd929f 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -71,7 +71,8 @@ public class SqlHandlerTest {
   private JdbcDataSource dataSource;
   private ConnectionConfiguration connectionConfig;
   private JdbcConnectorService connectorService;
-  private TableKeyColumnManager tableKeyColumnManager;
+  private TableMetaDataManager tableMetaDataManager;
+  private TableMetaDataView tableMetaDataView;
   private Connection connection;
   private Region region;
   private InternalCache cache;
@@ -93,10 +94,13 @@ public class SqlHandlerTest {
     cache = mock(InternalCache.class);
     connection = mock(Connection.class);
     when(region.getRegionService()).thenReturn(cache);
-    tableKeyColumnManager = mock(TableKeyColumnManager.class);
-    when(tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME)).thenReturn(KEY_COLUMN);
+    tableMetaDataManager = mock(TableMetaDataManager.class);
+    tableMetaDataView = mock(TableMetaDataView.class);
+    when(tableMetaDataView.getKeyColumnName()).thenReturn(KEY_COLUMN);
+    when(tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
+        .thenReturn(tableMetaDataView);
     connectorService = mock(JdbcConnectorService.class);
-    handler = new SqlHandler(manager, tableKeyColumnManager, connectorService);
+    handler = new SqlHandler(manager, tableMetaDataManager, connectorService);
     key = "key";
     value = mock(PdxInstanceImpl.class);
     when(value.getPdxType()).thenReturn(mock(PdxType.class));
@@ -209,6 +213,44 @@ public class SqlHandlerTest {
   }
 
   @Test
+  public void writeWithNullField() throws Exception {
+    String fieldName = "fieldName";
+    Object fieldValue = null;
+    int dataType = 0;
+    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
+    when(value.getField(fieldName)).thenReturn(fieldValue);
+
+    when(statement.executeUpdate()).thenReturn(1);
+    Object createKey = "createKey";
+    handler.write(region, Operation.CREATE, createKey, value);
+
+    verify(statement).setNull(1, dataType);
+    verify(statement).setObject(2, createKey);
+    verify(statement).close();
+  }
+
+  @Test
+  public void writeWithNullFieldWithDataTypeFromMetaData() throws Exception {
+    String fieldName = "fieldName";
+    Object fieldValue = null;
+    int dataType = 79;
+    when(tableMetaDataView.getColumnDataType(fieldName)).thenReturn(dataType);
+    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
+    when(value.getField(fieldName)).thenReturn(fieldValue);
+
+    when(statement.executeUpdate()).thenReturn(1);
+    Object createKey = "createKey";
+    handler.write(region, Operation.CREATE, createKey, value);
+
+    verify(statement).setNull(1, dataType);
+    verify(statement).setObject(2, createKey);
+    verify(statement).close();
+  }
+
+
+  @Test
   public void insertActionSucceeds() throws Exception {
     when(statement.executeUpdate()).thenReturn(1);
     Object createKey = "createKey";
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
index 988d792..b8bdbc2 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
@@ -36,9 +36,9 @@ public class SqlStatementFactoryTest {
 
   @Before
   public void setup() {
-    columnValues.add(new ColumnValue(false, "column0", null));
-    columnValues.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
-    columnValues.add(new ColumnValue(false, "column2", null));
+    columnValues.add(new ColumnValue(false, "column0", null, 0));
+    columnValues.add(new ColumnValue(true, KEY_COLUMN_NAME, null, 0));
+    columnValues.add(new ColumnValue(false, "column2", null, 0));
   }
 
   @Test
@@ -46,7 +46,7 @@ public class SqlStatementFactoryTest {
     String expectedStatement =
         String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
     List<ColumnValue> keyColumn = new ArrayList<>();
-    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null, 0));
 
     String statement = factory.createSelectQueryString(TABLE_NAME, keyColumn);
 
@@ -58,7 +58,7 @@ public class SqlStatementFactoryTest {
     String expectedStatement =
         String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
     List<ColumnValue> keyColumn = new ArrayList<>();
-    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null, 0));
 
     String statement = factory.createDestroySqlString(TABLE_NAME, keyColumn);
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManagerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManagerTest.java
deleted file mode 100644
index bd56df6..0000000
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManagerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.connectors.jdbc.JdbcConnectorException;
-import org.apache.geode.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class TableKeyColumnManagerTest {
-  private static final String TABLE_NAME = "testTable";
-  private static final String KEY_COLUMN = "keyColumn";
-
-  private TableKeyColumnManager tableKeyColumnManager;
-  private Connection connection;
-
-  @Before
-  public void setup() throws Exception {
-    tableKeyColumnManager = new TableKeyColumnManager();
-    connection = mock(Connection.class);
-  }
-
-  @Test
-  public void returnsSinglePrimaryKeyColumnName() throws Exception {
-    ResultSet primaryKeys = getPrimaryKeysMetaData();
-    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
-
-    assertThat(tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isEqualTo(KEY_COLUMN);
-    verify(connection).getMetaData();
-  }
-
-  @Test
-  public void secondCallDoesNotUseMetaData() throws Exception {
-    ResultSet primaryKeys = getPrimaryKeysMetaData();
-    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
-
-    assertThat(tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isEqualTo(KEY_COLUMN);
-    assertThat(tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isEqualTo(KEY_COLUMN);
-    verify(connection).getMetaData();
-  }
-
-  @Test
-  public void throwsExceptionWhenFailsToGetTableMetadata() throws Exception {
-    when(connection.getMetaData()).thenThrow(SQLException.class);
-
-    assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(JdbcConnectorException.class);
-  }
-
-  @Test
-  public void throwsExceptionWhenDesiredTableNotFound() throws Exception {
-    DatabaseMetaData metadata = mock(DatabaseMetaData.class);
-    ResultSet resultSet = mock(ResultSet.class);
-    when(connection.getMetaData()).thenReturn(metadata);
-    when(metadata.getTables(any(), any(), any(), any())).thenReturn(resultSet);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    when(resultSet.getString("TABLE_NAME")).thenReturn("otherTable");
-
-    assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(JdbcConnectorException.class);
-  }
-
-  @Test
-  public void throwsExceptionIfTableHasCompositePrimaryKey() throws Exception {
-    ResultSet primaryKeys = getPrimaryKeysMetaData();
-    when(primaryKeys.next()).thenReturn(true);
-
-    assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(JdbcConnectorException.class);
-  }
-
-  @Test
-  public void throwsExceptionWhenTwoTablesHasCaseInsensitiveSameName() throws Exception {
-    DatabaseMetaData metadata = mock(DatabaseMetaData.class);
-    ResultSet resultSet = mock(ResultSet.class);
-    when(connection.getMetaData()).thenReturn(metadata);
-    when(metadata.getTables(any(), any(), any(), any())).thenReturn(resultSet);
-    when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
-    when(resultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
-    when(resultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase());
-
-    assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(JdbcConnectorException.class)
-        .hasMessage("Duplicate tables that match region name");
-  }
-
-  @Test
-  public void throwsExceptionWhenNoPrimaryKeyInTable() throws Exception {
-    ResultSet primaryKeys = getPrimaryKeysMetaData();
-    when(primaryKeys.next()).thenReturn(false);
-
-    assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(JdbcConnectorException.class);
-  }
-
-  private ResultSet getPrimaryKeysMetaData() throws SQLException {
-    DatabaseMetaData metadata = mock(DatabaseMetaData.class);
-    ResultSet resultSet = mock(ResultSet.class);
-    ResultSet primaryKeys = mock(ResultSet.class);
-
-    when(connection.getMetaData()).thenReturn(metadata);
-    when(metadata.getTables(any(), any(), any(), any())).thenReturn(resultSet);
-    when(metadata.getPrimaryKeys(any(), any(), anyString())).thenReturn(primaryKeys);
-    when(primaryKeys.getString("COLUMN_NAME")).thenReturn(KEY_COLUMN);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    when(resultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
-
-    return primaryKeys;
-  }
-}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
new file mode 100644
index 0000000..28fcfa1
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.sql.Types;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class TableMetaDataManagerIntegrationTest {
+
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+
+  private TableMetaDataManager manager;
+  private Connection connection;
+  private Statement statement;
+
+  @Before
+  public void setup() throws Exception {
+    connection = DriverManager.getConnection(CONNECTION_URL);
+    statement = connection.createStatement();
+    statement.execute("Create Table " + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+    manager = new TableMetaDataManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    closeDB();
+  }
+
+  private void closeDB() throws Exception {
+    if (statement == null) {
+      statement = connection.createStatement();
+    }
+    statement.execute("Drop table " + REGION_TABLE_NAME);
+    statement.close();
+
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void validateKeyColumnName() {
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME);
+
+    String keyColumnName = metaData.getKeyColumnName();
+
+    assertThat(keyColumnName).isEqualToIgnoringCase("id");
+  }
+
+  @Test
+  public void validateColumnDataTypeForName() {
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME);
+
+    int nameDataType = metaData.getColumnDataType("name");
+
+    assertThat(nameDataType).isEqualTo(Types.VARCHAR);
+  }
+
+  @Test
+  public void validateColumnDataTypeForId() {
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME);
+
+    int nameDataType = metaData.getColumnDataType("id");
+
+    assertThat(nameDataType).isEqualTo(Types.VARCHAR);
+  }
+
+  @Test
+  public void validateColumnDataTypeForAge() {
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME);
+
+    int nameDataType = metaData.getColumnDataType("age");
+
+    assertThat(nameDataType).isEqualTo(Types.INTEGER);
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
new file mode 100644
index 0000000..6488b59
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TableMetaDataManagerTest {
+  private static final String TABLE_NAME = "testTable";
+  private static final String KEY_COLUMN = "keyColumn";
+
+  private TableMetaDataManager tableMetaDataManager;
+  private Connection connection;
+  DatabaseMetaData databaseMetaData;
+  ResultSet tablesResultSet;
+  ResultSet primaryKeysResultSet;
+  ResultSet columnResultSet;
+
+  @Before
+  public void setup() throws Exception {
+    tableMetaDataManager = new TableMetaDataManager();
+    connection = mock(Connection.class);
+    databaseMetaData = mock(DatabaseMetaData.class);
+    when(connection.getMetaData()).thenReturn(databaseMetaData);
+    tablesResultSet = mock(ResultSet.class);
+    when(databaseMetaData.getTables(any(), any(), any(), any())).thenReturn(tablesResultSet);
+    primaryKeysResultSet = mock(ResultSet.class);
+    when(databaseMetaData.getPrimaryKeys(any(), any(), anyString()))
+        .thenReturn(primaryKeysResultSet);
+    columnResultSet = mock(ResultSet.class);
+    when(databaseMetaData.getColumns(any(), any(), eq(TABLE_NAME), any()))
+        .thenReturn(columnResultSet);
+  }
+
+  @Test
+  public void returnsSinglePrimaryKeyColumnName() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+    assertThat(data.getKeyColumnName()).isEqualTo(KEY_COLUMN);
+    verify(connection).getMetaData();
+  }
+
+  @Test
+  public void secondCallDoesNotUseMetaData() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+
+    tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+    tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+    verify(connection).getMetaData();
+  }
+
+  @Test
+  public void throwsExceptionWhenFailsToGetTableMetadata() throws Exception {
+    SQLException cause = new SQLException("sql message");
+    when(connection.getMetaData()).thenThrow(cause);
+
+    assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
+        .isInstanceOf(JdbcConnectorException.class).hasMessageContaining("sql message");
+  }
+
+  @Test
+  public void throwsExceptionWhenDesiredTableNotFound() throws Exception {
+    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn("otherTable");
+
+    assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
+        .isInstanceOf(JdbcConnectorException.class)
+        .hasMessage("no table was found that matches testTable");
+  }
+
+  @Test
+  public void throwsExceptionIfTableHasCompositePrimaryKey() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true);
+
+    assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
+        .isInstanceOf(JdbcConnectorException.class)
+        .hasMessage("The table " + TABLE_NAME + " has more than one primary key column.");
+  }
+
+  @Test
+  public void throwsExceptionWhenTwoTablesHasCaseInsensitiveSameName() throws Exception {
+    when(tablesResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase());
+
+    assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
+        .isInstanceOf(JdbcConnectorException.class)
+        .hasMessage("Duplicate tables that match region name");
+  }
+
+  @Test
+  public void throwsExceptionWhenNoPrimaryKeyInTable() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(false);
+
+    assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
+        .isInstanceOf(JdbcConnectorException.class)
+        .hasMessage("The table " + TABLE_NAME + " does not have a primary key column.");
+  }
+
+  @Test
+  public void unknownColumnsDataTypeIsZero() throws SQLException {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+    int dataType = data.getColumnDataType("unknownColumn");
+
+    assertThat(dataType).isEqualTo(0);
+  }
+
+  @Test
+  public void validateExpectedDataTypes() throws SQLException {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    String columnName1 = "columnName1";
+    int columnDataType1 = 1;
+    String columnName2 = "columnName2";
+    int columnDataType2 = 2;
+    when(columnResultSet.getString("COLUMN_NAME")).thenReturn(columnName1).thenReturn(columnName2);
+    when(columnResultSet.getInt("DATA_TYPE")).thenReturn(columnDataType1)
+        .thenReturn(columnDataType2);
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+    int dataType1 = data.getColumnDataType(columnName1);
+    int dataType2 = data.getColumnDataType(columnName2);
+
+    assertThat(dataType1).isEqualTo(columnDataType1);
+    assertThat(dataType2).isEqualTo(columnDataType2);
+    verify(primaryKeysResultSet).close();
+    verify(columnResultSet).close();
+  }
+
+  @Test
+  public void validateThatCloseOnPrimaryKeysResultSetIsCalledByGetTableMetaDataView()
+      throws SQLException {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+
+    verify(primaryKeysResultSet).close();
+  }
+
+  @Test
+  public void validateThatCloseOnColumnResultSetIsCalledByGetTableMetaDataView()
+      throws SQLException {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+
+    verify(columnResultSet).close();
+  }
+
+  @Test
+  public void lookingUpDataTypeWithNameThatDiffersInCaseWillFindIt() throws SQLException {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(columnResultSet.next()).thenReturn(true).thenReturn(false);
+    String columnName1 = "columnName1";
+    int columnDataType1 = 1;
+    when(columnResultSet.getString("COLUMN_NAME")).thenReturn(columnName1);
+    when(columnResultSet.getInt("DATA_TYPE")).thenReturn(columnDataType1);
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+    int dataType1 = data.getColumnDataType(columnName1.toUpperCase());
+
+    assertThat(dataType1).isEqualTo(columnDataType1);
+  }
+
+  @Test
+  public void throwsExceptionWhenTwoColumnsWithSameCaseInsensitiveNameExist() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(columnResultSet.getString("COLUMN_NAME")).thenReturn("colName").thenReturn("COLNAME");
+
+    assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
+        .isInstanceOf(JdbcConnectorException.class).hasMessage(
+            "Column names must be different in case. Two columns both have the name colname");
+  }
+
+
+  private void setupPrimaryKeysMetaData() throws SQLException {
+    when(primaryKeysResultSet.getString("COLUMN_NAME")).thenReturn(KEY_COLUMN);
+    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java b/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java
index e120432..f2aa41a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java
@@ -218,6 +218,9 @@ public interface GemFireCache extends RegionService {
    * Registers PDX meta-data given an instance of a domain class that will be serialized
    * with PDX.
    * <p>
+   * Note that this method serializes the given instance so PDX must already be configured
+   * to do the serialization.
+   * <p>
    * Note that if the instance is not of a class that will be serialized with PDX
    * then no meta-data is registered.
    * <p>
@@ -226,7 +229,8 @@ public interface GemFireCache extends RegionService {
    * table using geode.
    *
    * @param instance the instance of the domain class for which meta-data is to be registered
-   * @throws SerializationException if the instance can not be serialized
+   * @throws SerializationException if the instance can not be serialized or is not serialized with
+   *         PDX
    * @since Geode 1.6
    */
   void registerPdxMetaData(Object instance);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index a82e152..c4b11b4 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -174,6 +174,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.i18n.LogWriterI18n;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.ClassPathLoader;
+import org.apache.geode.internal.DSCODE;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.backup.BackupService;
@@ -5339,7 +5340,10 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
   @Override
   public void registerPdxMetaData(Object instance) {
     try {
-      BlobHelper.serializeToBlob(instance);
+      byte[] blob = BlobHelper.serializeToBlob(instance);
+      if (blob.length == 0 || blob[0] != DSCODE.PDX) {
+        throw new SerializationException("The instance is not PDX serializable");
+      }
     } catch (IOException e) {
       throw new SerializationException("Serialization failed", e);
     }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index 8161926..c2233df 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -15,9 +15,11 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.io.NotSerializableException;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -29,6 +31,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.SerializationException;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.wan.GatewayReceiver;
@@ -118,6 +121,36 @@ public class GemFireCacheImplTest {
   }
 
   @Test
+  public void registerPdxMetaDataThrowsIfInstanceNotSerializable() {
+    InternalDistributedSystem ds = Fakes.distributedSystem();
+    CacheConfig cc = new CacheConfig();
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, cc, typeRegistry);
+    try {
+      assertThatThrownBy(() -> gfc.registerPdxMetaData(new Object()))
+          .isInstanceOf(SerializationException.class).hasMessage("Serialization failed")
+          .hasCauseInstanceOf(NotSerializableException.class);
+    } finally {
+      gfc.close();
+    }
+  }
+
+  @Test
+  public void registerPdxMetaDataThrowsIfInstanceIsNotPDX() {
+    InternalDistributedSystem ds = Fakes.distributedSystem();
+    CacheConfig cc = new CacheConfig();
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(ds, cc, typeRegistry);
+    try {
+      assertThatThrownBy(() -> gfc.registerPdxMetaData("string"))
+          .isInstanceOf(SerializationException.class)
+          .hasMessage("The instance is not PDX serializable");
+    } finally {
+      gfc.close();
+    }
+  }
+
+  @Test
   public void checkThatAsyncEventListenersUseAllThreadsInPool() {
     InternalDistributedSystem ds = Fakes.distributedSystem();
     CacheConfig cc = new CacheConfig();

-- 
To stop receiving notification emails like this one, please contact
dschneider@apache.org.