You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2018/03/23 23:16:19 UTC

[geode] branch develop updated: GEODE-4894: Changes are made to support case sensitivity between region, table name and pdx field, column name (#1663)

This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new da51fce  GEODE-4894: Changes are made to support case sensitivity between region, table name and pdx field, column name (#1663)
da51fce is described below

commit da51fceefce757df055e3baaf8a4764a04b5ff66
Author: agingade <ag...@pivotal.io>
AuthorDate: Fri Mar 23 16:16:15 2018 -0700

    GEODE-4894: Changes are made to support case sensitivity between region, table name and pdx field, column name (#1663)
    
    * meta-data is now used to compute the column name
    * reads will now consult the pdx registry for an existing pdx type to use its field name.
    * Convert field name to lowercase only when column name is uppercase.
    * Table name lookup is changes to look for exact match first, than case insensitive match.
    * Field to column name lookup is changed to find exact column name or case insensitive match in the metadata.
---
 .../connectors/jdbc/internal/RegionMapping.java    | 194 +++++++++++---
 .../geode/connectors/jdbc/internal/SqlHandler.java |  41 ++-
 .../jdbc/internal/SqlStatementFactory.java         |  14 +-
 .../jdbc/internal/SqlToPdxInstanceCreator.java     |  15 +-
 .../connectors/jdbc/internal/TableMetaData.java    |  29 ++-
 .../jdbc/internal/TableMetaDataManager.java        |  24 +-
 .../jdbc/internal/TableMetaDataView.java           |   6 +
 .../sanctioned-geode-connectors-serializables.txt  |   2 +-
 .../jdbc/ClassWithSupportedPdxFields.java          |  18 +-
 .../geode/connectors/jdbc/JdbcDUnitTest.java       |  42 +--
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |   5 +
 .../jdbc/internal/RegionMappingBuilderTest.java    |  16 +-
 .../jdbc/internal/RegionMappingTest.java           | 283 +++++++++++++++++++--
 .../connectors/jdbc/internal/SqlHandlerTest.java   |  24 +-
 .../jdbc/internal/SqlStatementFactoryTest.java     |  17 +-
 .../jdbc/internal/SqlToPdxInstanceCreatorTest.java |  30 ++-
 .../TableMetaDataManagerIntegrationTest.java       |   4 +-
 .../jdbc/internal/TableMetaDataManagerTest.java    |  78 ++++--
 .../cli/CreateMappingCommandIntegrationTest.java   |   8 +-
 .../cli/JdbcClusterConfigDistributedTest.java      |   8 +-
 .../jdbc/internal/xml/ElementTypeTest.java         |   4 +-
 .../geode/pdx/internal/ClientTypeRegistration.java |  16 ++
 .../geode/pdx/internal/LonerTypeRegistration.java  |   6 +
 .../geode/pdx/internal/NullTypeRegistration.java   |   6 +
 .../geode/pdx/internal/PeerTypeRegistration.java   |  32 ++-
 .../geode/pdx/internal/TypeRegistration.java       |   9 +-
 .../apache/geode/pdx/internal/TypeRegistry.java    |   9 +
 .../geode/cache/query/dunit/PdxQueryDUnitTest.java |   8 +-
 28 files changed, 725 insertions(+), 223 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
index d87c4d5..b466443 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
@@ -17,9 +17,16 @@ package org.apache.geode.connectors.jdbc.internal;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.geode.annotations.Experimental;
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.pdx.internal.PdxType;
+import org.apache.geode.pdx.internal.TypeRegistry;
 
 @Experimental
 public class RegionMapping implements Serializable {
@@ -28,37 +35,45 @@ public class RegionMapping implements Serializable {
   private final String tableName;
   private final String connectionConfigName;
   private final Boolean primaryKeyInValue;
-  private final Map<String, String> fieldToColumnMap;
-  private final Map<String, String> columnToFieldMap;
+  private final ConcurrentMap<String, String> fieldToColumnMap;
+  private final ConcurrentMap<String, String> columnToFieldMap;
+
+  private final Map<String, String> configuredFieldToColumnMap;
+  private final Map<String, String> configuredColumnToFieldMap;
 
   public RegionMapping(String regionName, String pdxClassName, String tableName,
       String connectionConfigName, Boolean primaryKeyInValue,
-      Map<String, String> fieldToColumnMap) {
+      Map<String, String> configuredFieldToColumnMap) {
     this.regionName = regionName;
     this.pdxClassName = pdxClassName;
     this.tableName = tableName;
     this.connectionConfigName = connectionConfigName;
     this.primaryKeyInValue = primaryKeyInValue;
-    this.fieldToColumnMap =
-        fieldToColumnMap == null ? null : Collections.unmodifiableMap(fieldToColumnMap);
-    this.columnToFieldMap = createReverseMap(fieldToColumnMap);
+    this.fieldToColumnMap = new ConcurrentHashMap<>();
+    this.columnToFieldMap = new ConcurrentHashMap<>();
+    if (configuredFieldToColumnMap != null) {
+      this.configuredFieldToColumnMap =
+          Collections.unmodifiableMap(new HashMap<>(configuredFieldToColumnMap));
+      this.configuredColumnToFieldMap =
+          Collections.unmodifiableMap(createReverseMap(configuredFieldToColumnMap));
+    } else {
+      this.configuredFieldToColumnMap = null;
+      this.configuredColumnToFieldMap = null;
+    }
   }
 
-  private static Map<String, String> createReverseMap(Map<String, String> fieldToColumnMap) {
-    if (fieldToColumnMap == null) {
-      return null;
-    }
-    Map<String, String> reverseMap = new HashMap<>();
-    for (Map.Entry<String, String> entry : fieldToColumnMap.entrySet()) {
-      String reverseMapKey = entry.getValue().toLowerCase();
+  private static Map<String, String> createReverseMap(Map<String, String> input) {
+    Map<String, String> output = new HashMap<>();
+    for (Map.Entry<String, String> entry : input.entrySet()) {
+      String reverseMapKey = entry.getValue();
       String reverseMapValue = entry.getKey();
-      if (reverseMap.containsKey(reverseMapKey)) {
+      if (output.containsKey(reverseMapKey)) {
         throw new IllegalArgumentException(
             "The field " + reverseMapValue + " can not be mapped to more than one column.");
       }
-      reverseMap.put(reverseMapKey, reverseMapValue);
+      output.put(reverseMapKey, reverseMapValue);
     }
-    return Collections.unmodifiableMap(reverseMap);
+    return output;
   }
 
   public String getConnectionConfigName() {
@@ -88,32 +103,136 @@ public class RegionMapping implements Serializable {
     return tableName;
   }
 
-  public String getColumnNameForField(String fieldName) {
-    String columnName = null;
-    if (fieldToColumnMap != null) {
-      columnName = fieldToColumnMap.get(fieldName);
+  private String getConfiguredColumnNameForField(String fieldName) {
+    String result = fieldName;
+    if (configuredFieldToColumnMap != null) {
+      String mapResult = configuredFieldToColumnMap.get(fieldName);
+      if (mapResult != null) {
+        result = mapResult;
+      }
+    }
+    return result;
+  }
+
+  public String getColumnNameForField(String fieldName, TableMetaDataView tableMetaDataView) {
+    String columnName = fieldToColumnMap.get(fieldName);
+    if (columnName == null) {
+      String configuredColumnName = getConfiguredColumnNameForField(fieldName);
+      Set<String> columnNames = tableMetaDataView.getColumnNames();
+      if (columnNames.contains(configuredColumnName)) {
+        // exact match
+        columnName = configuredColumnName;
+      } else {
+        for (String candidate : columnNames) {
+          if (candidate.equalsIgnoreCase(configuredColumnName)) {
+            if (columnName != null) {
+              throw new JdbcConnectorException(
+                  "The SQL table has at least two columns that match the PDX field: " + fieldName);
+            }
+            columnName = candidate;
+          }
+        }
+      }
+
+      if (columnName == null) {
+        columnName = configuredColumnName;
+      }
+      fieldToColumnMap.put(fieldName, columnName);
+      columnToFieldMap.put(columnName, fieldName);
     }
-    return columnName != null ? columnName : fieldName;
+    return columnName;
   }
 
-  public String getFieldNameForColumn(String columnName) {
-    String canonicalColumnName = columnName.toLowerCase();
-    String fieldName = null;
-    if (this.columnToFieldMap != null) {
-      fieldName = columnToFieldMap.get(canonicalColumnName);
+  private String getConfiguredFieldNameForColumn(String columnName) {
+    String result = columnName;
+    if (configuredColumnToFieldMap != null) {
+      String mapResult = configuredColumnToFieldMap.get(columnName);
+      if (mapResult != null) {
+        result = mapResult;
+      }
     }
-    return fieldName != null ? fieldName : canonicalColumnName;
+    return result;
   }
 
-  public Map<String, String> getFieldToColumnMap() {
-    return fieldToColumnMap;
+  public String getFieldNameForColumn(String columnName, TypeRegistry typeRegistry) {
+    String fieldName = columnToFieldMap.get(columnName);
+    if (fieldName == null) {
+      String configuredFieldName = getConfiguredFieldNameForColumn(columnName);
+      if (getPdxClassName() == null) {
+        if (configuredFieldName.equals(configuredFieldName.toUpperCase())) {
+          fieldName = configuredFieldName.toLowerCase();
+        } else {
+          fieldName = configuredFieldName;
+        }
+      } else {
+        Set<PdxType> pdxTypes = getPdxTypesForClassName(typeRegistry);
+        fieldName = findExactMatch(configuredFieldName, pdxTypes);
+        if (fieldName == null) {
+          fieldName = findCaseInsensitiveMatch(columnName, configuredFieldName, pdxTypes);
+        }
+      }
+      assert fieldName != null;
+      fieldToColumnMap.put(fieldName, columnName);
+      columnToFieldMap.put(columnName, fieldName);
+    }
+    return fieldName;
+  }
+
+  private Set<PdxType> getPdxTypesForClassName(TypeRegistry typeRegistry) {
+    Set<PdxType> pdxTypes = typeRegistry.getPdxTypesForClassName(getPdxClassName());
+    if (pdxTypes.isEmpty()) {
+      throw new JdbcConnectorException(
+          "The class " + getPdxClassName() + " has not been pdx serialized.");
+    }
+    return pdxTypes;
+  }
+
+  /**
+   * Given a column name and a set of pdx types, find the field name in those types that match,
+   * ignoring case, the column name.
+   *
+   * @return the matching field name or null if no match
+   * @throws JdbcConnectorException if no fields match
+   * @throws JdbcConnectorException if more than one field matches
+   */
+  private String findCaseInsensitiveMatch(String columnName, String configuredFieldName,
+      Set<PdxType> pdxTypes) {
+    HashSet<String> matchingFieldNames = new HashSet<>();
+    for (PdxType pdxType : pdxTypes) {
+      for (String existingFieldName : pdxType.getFieldNames()) {
+        if (existingFieldName.equalsIgnoreCase(configuredFieldName)) {
+          matchingFieldNames.add(existingFieldName);
+        }
+      }
+    }
+    if (matchingFieldNames.isEmpty()) {
+      throw new JdbcConnectorException("The class " + getPdxClassName()
+          + " does not have a field that matches the column " + columnName);
+    } else if (matchingFieldNames.size() > 1) {
+      throw new JdbcConnectorException(
+          "Could not determine what pdx field to use for the column name " + columnName
+              + " because the pdx fields " + matchingFieldNames + " all match it.");
+    }
+    return matchingFieldNames.iterator().next();
   }
 
   /**
-   * For unit tests
+   * Given a column name, search the given pdxTypes for a field whose name exactly matches the
+   * column name.
+   *
+   * @return the matching field name or null if no match
    */
-  Map<String, String> getColumnToFieldMap() {
-    return this.columnToFieldMap;
+  private String findExactMatch(String columnName, Set<PdxType> pdxTypes) {
+    for (PdxType pdxType : pdxTypes) {
+      if (pdxType.getPdxField(columnName) != null) {
+        return columnName;
+      }
+    }
+    return null;
+  }
+
+  public Map<String, String> getFieldToColumnMap() {
+    return configuredFieldToColumnMap;
   }
 
   @Override
@@ -144,8 +263,10 @@ public class RegionMapping implements Serializable {
         : that.connectionConfigName != null) {
       return false;
     }
-    return fieldToColumnMap != null ? fieldToColumnMap.equals(that.fieldToColumnMap)
-        : that.fieldToColumnMap == null;
+
+    return (configuredFieldToColumnMap != null
+        ? configuredFieldToColumnMap.equals(that.configuredFieldToColumnMap)
+        : that.configuredFieldToColumnMap == null);
   }
 
   @Override
@@ -155,7 +276,8 @@ public class RegionMapping implements Serializable {
     result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
     result = 31 * result + (connectionConfigName != null ? connectionConfigName.hashCode() : 0);
     result = 31 * result + (primaryKeyInValue ? 1 : 0);
-    result = 31 * result + (fieldToColumnMap != null ? fieldToColumnMap.hashCode() : 0);
+    result = 31 * result
+        + (configuredFieldToColumnMap != null ? configuredFieldToColumnMap.hashCode() : 0);
     return result;
   }
 
@@ -164,6 +286,6 @@ public class RegionMapping implements Serializable {
     return "RegionMapping{" + "regionName='" + regionName + '\'' + ", pdxClassName='" + pdxClassName
         + '\'' + ", tableName='" + tableName + '\'' + ", connectionConfigName='"
         + connectionConfigName + '\'' + ", primaryKeyInValue=" + primaryKeyInValue
-        + ", fieldToColumnMap=" + fieldToColumnMap + '}';
+        + ", fieldToColumnMap=" + configuredFieldToColumnMap + '}';
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index e277254..94a792b 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -59,15 +59,17 @@ public class SqlHandler {
     RegionMapping regionMapping = getMappingForRegion(region.getName());
     ConnectionConfiguration connectionConfig =
         getConnectionConfig(regionMapping.getConnectionConfigName());
-    String tableName = regionMapping.getRegionToTableName();
     PdxInstance result;
     try (Connection connection = getConnection(connectionConfig)) {
+      TableMetaDataView tableMetaData = this.tableMetaDataManager.getTableMetaDataView(connection,
+          regionMapping.getRegionToTableName());
+      String realTableName = tableMetaData.getTableName();
       List<ColumnValue> columnList =
-          getColumnToValueList(connection, regionMapping, key, null, Operation.GET);
+          getColumnToValueList(tableMetaData, regionMapping, key, null, Operation.GET);
       try (PreparedStatement statement =
-          getPreparedStatement(connection, columnList, tableName, Operation.GET)) {
+          getPreparedStatement(connection, columnList, realTableName, Operation.GET)) {
         try (ResultSet resultSet = executeReadQuery(statement, columnList)) {
-          String keyColumnName = getKeyColumnName(connection, tableName);
+          String keyColumnName = tableMetaData.getKeyColumnName();
           InternalCache cache = (InternalCache) region.getRegionService();
           SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
               new SqlToPdxInstanceCreator(cache, regionMapping, resultSet, keyColumnName);
@@ -84,7 +86,6 @@ public class SqlHandler {
     return statement.executeQuery();
   }
 
-
   private RegionMapping getMappingForRegion(String regionName) {
     RegionMapping regionMapping = this.configService.getMappingForRegion(regionName);
     if (regionMapping == null) {
@@ -104,10 +105,6 @@ public class SqlHandler {
     return connectionConfig;
   }
 
-  private String getKeyColumnName(Connection connection, String tableName) {
-    return this.tableMetaDataManager.getTableMetaDataView(connection, tableName).getKeyColumnName();
-  }
-
   private void setValuesInStatement(PreparedStatement statement, List<ColumnValue> columnList)
       throws SQLException {
     int index = 0;
@@ -134,14 +131,15 @@ public class SqlHandler {
     ConnectionConfiguration connectionConfig =
         getConnectionConfig(regionMapping.getConnectionConfigName());
 
-    String tableName = regionMapping.getRegionToTableName();
-
     try (Connection connection = getConnection(connectionConfig)) {
+      TableMetaDataView tableMetaData = this.tableMetaDataManager.getTableMetaDataView(connection,
+          regionMapping.getRegionToTableName());
+      String realTableName = tableMetaData.getTableName();
       List<ColumnValue> columnList =
-          getColumnToValueList(connection, regionMapping, key, value, operation);
+          getColumnToValueList(tableMetaData, regionMapping, key, value, operation);
       int updateCount = 0;
       try (PreparedStatement statement =
-          getPreparedStatement(connection, columnList, tableName, operation)) {
+          getPreparedStatement(connection, columnList, realTableName, operation)) {
         updateCount = executeWriteStatement(statement, columnList);
       } catch (SQLException e) {
         if (operation.isDestroy()) {
@@ -157,7 +155,7 @@ public class SqlHandler {
       if (updateCount <= 0) {
         Operation upsertOp = getOppositeOperation(operation);
         try (PreparedStatement upsertStatement =
-            getPreparedStatement(connection, columnList, tableName, upsertOp)) {
+            getPreparedStatement(connection, columnList, realTableName, upsertOp)) {
           updateCount = executeWriteStatement(upsertStatement, columnList);
         }
       }
@@ -197,11 +195,8 @@ public class SqlHandler {
     }
   }
 
-  <K> List<ColumnValue> getColumnToValueList(Connection connection, RegionMapping regionMapping,
-      K key, PdxInstance value, Operation operation) {
-    String tableName = regionMapping.getRegionToTableName();
-    TableMetaDataView tableMetaData =
-        this.tableMetaDataManager.getTableMetaDataView(connection, tableName);
+  <K> List<ColumnValue> getColumnToValueList(TableMetaDataView tableMetaData,
+      RegionMapping regionMapping, K key, PdxInstance value, Operation operation) {
     String keyColumnName = tableMetaData.getKeyColumnName();
     ColumnValue keyColumnValue =
         new ColumnValue(true, keyColumnName, key, tableMetaData.getColumnDataType(keyColumnName));
@@ -210,17 +205,17 @@ public class SqlHandler {
       return Collections.singletonList(keyColumnValue);
     }
 
-    List<ColumnValue> result =
-        createColumnValueList(tableMetaData, regionMapping, value, keyColumnName);
+    List<ColumnValue> result = createColumnValueList(tableMetaData, regionMapping, value);
     result.add(keyColumnValue);
     return result;
   }
 
   private List<ColumnValue> createColumnValueList(TableMetaDataView tableMetaData,
-      RegionMapping regionMapping, PdxInstance value, String keyColumnName) {
+      RegionMapping regionMapping, PdxInstance value) {
+    final String keyColumnName = tableMetaData.getKeyColumnName();
     List<ColumnValue> result = new ArrayList<>();
     for (String fieldName : value.getFieldNames()) {
-      String columnName = regionMapping.getColumnNameForField(fieldName);
+      String columnName = regionMapping.getColumnNameForField(fieldName, tableMetaData);
       if (columnName.equalsIgnoreCase(keyColumnName)) {
         continue;
       }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
index d5367ef..ac787b8 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
@@ -22,18 +22,18 @@ class SqlStatementFactory {
     assert columnList.size() == 1;
     ColumnValue keyCV = columnList.get(0);
     assert keyCV.isKey();
-    return "SELECT * FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+    return "SELECT * FROM \"" + tableName + "\" WHERE \"" + keyCV.getColumnName() + "\" = ?";
   }
 
   String createDestroySqlString(String tableName, List<ColumnValue> columnList) {
     assert columnList.size() == 1;
     ColumnValue keyCV = columnList.get(0);
     assert keyCV.isKey();
-    return "DELETE FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+    return "DELETE FROM \"" + tableName + "\" WHERE \"" + keyCV.getColumnName() + "\" = ?";
   }
 
   String createUpdateSqlString(String tableName, List<ColumnValue> columnList) {
-    StringBuilder query = new StringBuilder("UPDATE " + tableName + " SET ");
+    StringBuilder query = new StringBuilder("UPDATE \"" + tableName + "\" SET ");
     int idx = 0;
     for (ColumnValue column : columnList) {
       if (!column.isKey()) {
@@ -41,14 +41,14 @@ class SqlStatementFactory {
         if (idx > 1) {
           query.append(", ");
         }
-        query.append(column.getColumnName());
+        query.append('"').append(column.getColumnName()).append('"');
         query.append(" = ?");
       }
     }
     for (ColumnValue column : columnList) {
       if (column.isKey()) {
         query.append(" WHERE ");
-        query.append(column.getColumnName());
+        query.append('"').append(column.getColumnName()).append('"');
         query.append(" = ?");
         // currently only support simple primary key with one column
         break;
@@ -58,13 +58,13 @@ class SqlStatementFactory {
   }
 
   String createInsertSqlString(String tableName, List<ColumnValue> columnList) {
-    StringBuilder columnNames = new StringBuilder("INSERT INTO " + tableName + " (");
+    StringBuilder columnNames = new StringBuilder("INSERT INTO \"" + tableName + "\" (");
     StringBuilder columnValues = new StringBuilder(" VALUES (");
     int columnCount = columnList.size();
     int idx = 0;
     for (ColumnValue column : columnList) {
       idx++;
-      columnNames.append(column.getColumnName());
+      columnNames.append('"').append(column.getColumnName()).append('"');
       columnValues.append('?');
       if (idx != columnCount) {
         columnNames.append(", ");
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
index fd0480b..1d6a816 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
@@ -25,6 +25,7 @@ import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxInstanceFactory;
 import org.apache.geode.pdx.internal.PdxField;
 import org.apache.geode.pdx.internal.PdxType;
+import org.apache.geode.pdx.internal.TypeRegistry;
 
 class SqlToPdxInstanceCreator {
   private final InternalCache cache;
@@ -46,11 +47,13 @@ class SqlToPdxInstanceCreator {
     if (resultSet.next()) {
       ResultSetMetaData metaData = resultSet.getMetaData();
       int ColumnsNumber = metaData.getColumnCount();
+      TypeRegistry typeRegistry = cache.getPdxRegistry();
       for (int i = 1; i <= ColumnsNumber; i++) {
         String columnName = metaData.getColumnName(i);
         if (regionMapping.isPrimaryKeyInValue() || !keyColumnName.equalsIgnoreCase(columnName)) {
-          String fieldName = mapColumnNameToFieldName(columnName, regionMapping);
-          FieldType fieldType = getFieldType(cache, regionMapping.getPdxClassName(), fieldName);
+          String fieldName = regionMapping.getFieldNameForColumn(columnName, typeRegistry);
+          FieldType fieldType =
+              getFieldType(typeRegistry, regionMapping.getPdxClassName(), fieldName);
           writeField(factory, resultSet, i, fieldName, fieldType);
         }
       }
@@ -63,10 +66,6 @@ class SqlToPdxInstanceCreator {
     return pdxInstance;
   }
 
-  private String mapColumnNameToFieldName(String columnName, RegionMapping regionMapping) {
-    return regionMapping.getFieldNameForColumn(columnName);
-  }
-
   private PdxInstanceFactory getPdxInstanceFactory(InternalCache cache,
       RegionMapping regionMapping) {
     String valueClassName = regionMapping.getPdxClassName();
@@ -184,12 +183,12 @@ class SqlToPdxInstanceCreator {
     }
   }
 
-  private FieldType getFieldType(InternalCache cache, String pdxClassName, String fieldName) {
+  private FieldType getFieldType(TypeRegistry typeRegistry, String pdxClassName, String fieldName) {
     if (pdxClassName == null) {
       return FieldType.OBJECT;
     }
 
-    PdxType pdxType = cache.getPdxRegistry().getPdxTypeForField(fieldName, pdxClassName);
+    PdxType pdxType = typeRegistry.getPdxTypeForField(fieldName, pdxClassName);
     if (pdxType != null) {
       PdxField pdxField = pdxType.getPdxField(fieldName);
       if (pdxField != null) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
index 12e7682..532bcc4 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
@@ -17,16 +17,23 @@
 package org.apache.geode.connectors.jdbc.internal;
 
 import java.util.HashMap;
-
-import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import java.util.Set;
 
 public class TableMetaData implements TableMetaDataView {
 
+  private final String tableName;
   private final String keyColumnName;
-  private final HashMap<String, Integer> columnNameToTypeMap = new HashMap<>();
+  private final HashMap<String, Integer> columnNameToTypeMap;
 
-  public TableMetaData(String keyColumnName) {
+  public TableMetaData(String tableName, String keyColumnName) {
+    this.tableName = tableName;
     this.keyColumnName = keyColumnName;
+    this.columnNameToTypeMap = new HashMap<>();
+  }
+
+  @Override
+  public String getTableName() {
+    return tableName;
   }
 
   @Override
@@ -36,19 +43,19 @@ public class TableMetaData implements TableMetaDataView {
 
   @Override
   public int getColumnDataType(String columnName) {
-    Integer dataType = this.columnNameToTypeMap.get(columnName.toLowerCase());
+    Integer dataType = this.columnNameToTypeMap.get(columnName);
     if (dataType == null) {
       return 0;
     }
     return dataType;
   }
 
+  @Override
+  public Set<String> getColumnNames() {
+    return columnNameToTypeMap.keySet();
+  }
+
   public void addDataType(String columnName, int dataType) {
-    Integer previousDataType = this.columnNameToTypeMap.put(columnName.toLowerCase(), dataType);
-    if (previousDataType != null) {
-      throw new JdbcConnectorException(
-          "Column names must be different in case. Two columns both have the name "
-              + columnName.toLowerCase());
-    }
+    this.columnNameToTypeMap.put(columnName, dataType);
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
index 4d9a46f..d821380 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
@@ -46,7 +46,7 @@ public class TableMetaDataManager {
       try (ResultSet tables = metaData.getTables(null, null, "%", null)) {
         String realTableName = getTableNameFromMetaData(tableName, tables);
         String key = getPrimaryKeyColumnNameFromMetaData(realTableName, metaData);
-        result = new TableMetaData(key);
+        result = new TableMetaData(realTableName, key);
         getDataTypesFromMetaData(realTableName, metaData, result);
       }
     } catch (SQLException e) {
@@ -56,21 +56,27 @@ public class TableMetaDataManager {
   }
 
   private String getTableNameFromMetaData(String tableName, ResultSet tables) throws SQLException {
-    String realTableName = null;
+    String result = null;
+    int inexactMatches = 0;
+
     while (tables.next()) {
       String name = tables.getString("TABLE_NAME");
-      if (name.equalsIgnoreCase(tableName)) {
-        if (realTableName != null) {
-          throw new JdbcConnectorException("Duplicate tables that match region name");
-        }
-        realTableName = name;
+      if (name.equals(tableName)) {
+        return name;
+      } else if (name.equalsIgnoreCase(tableName)) {
+        inexactMatches++;
+        result = name;
       }
     }
 
-    if (realTableName == null) {
+    if (inexactMatches > 1) {
+      throw new JdbcConnectorException("Duplicate tables that match region name");
+    }
+
+    if (result == null) {
       throw new JdbcConnectorException("no table was found that matches " + tableName);
     }
-    return realTableName;
+    return result;
   }
 
   private String getPrimaryKeyColumnNameFromMetaData(String tableName, DatabaseMetaData metaData)
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
index c78c8f7..a929ff6 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
@@ -16,8 +16,14 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
+import java.util.Set;
+
 public interface TableMetaDataView {
+  public String getTableName();
+
   public String getKeyColumnName();
 
   public int getColumnDataType(String columnName);
+
+  public Set<String> getColumnNames();
 }
diff --git a/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt b/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
index 6a6b8be..d35a3b7 100755
--- a/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
+++ b/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
@@ -2,7 +2,7 @@ org/apache/geode/connectors/jdbc/JdbcConnectorException,true,1
 org/apache/geode/connectors/jdbc/internal/ConnectionConfigExistsException,false
 org/apache/geode/connectors/jdbc/internal/ConnectionConfigNotFoundException,false
 org/apache/geode/connectors/jdbc/internal/ConnectionConfiguration,false,name:java/lang/String,parameters:java/util/Map,password:java/lang/String,url:java/lang/String,user:java/lang/String
-org/apache/geode/connectors/jdbc/internal/RegionMapping,false,columnToFieldMap:java/util/Map,connectionConfigName:java/lang/String,fieldToColumnMap:java/util/Map,pdxClassName:java/lang/String,primaryKeyInValue:java/lang/Boolean,regionName:java/lang/String,tableName:java/lang/String
+org/apache/geode/connectors/jdbc/internal/RegionMapping,false,columnToFieldMap:java/util/concurrent/ConcurrentMap,configuredColumnToFieldMap:java/util/Map,configuredFieldToColumnMap:java/util/Map,connectionConfigName:java/lang/String,fieldToColumnMap:java/util/concurrent/ConcurrentMap,pdxClassName:java/lang/String,primaryKeyInValue:java/lang/Boolean,regionName:java/lang/String,tableName:java/lang/String
 org/apache/geode/connectors/jdbc/internal/RegionMappingExistsException,false
 org/apache/geode/connectors/jdbc/internal/RegionMappingNotFoundException,false
 org/apache/geode/connectors/jdbc/internal/cli/AlterConnectionFunction,false
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java
index fdd2b10..02aceff 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java
@@ -23,8 +23,8 @@ import org.apache.geode.internal.PdxSerializerObject;
 
 public class ClassWithSupportedPdxFields implements PdxSerializerObject, Serializable {
   private boolean aboolean;
-  private byte abyte;
-  private short ashort;
+  private byte aByte;
+  private short ASHORT;
   private int anint;
   private long along;
   private float afloat;
@@ -37,12 +37,12 @@ public class ClassWithSupportedPdxFields implements PdxSerializerObject, Seriali
 
   public ClassWithSupportedPdxFields() {}
 
-  public ClassWithSupportedPdxFields(boolean aboolean, byte abyte, short ashort, int anint,
+  public ClassWithSupportedPdxFields(boolean aboolean, byte aByte, short ASHORT, int anint,
       long along, float afloat, double adouble, String astring, Date adate, Object anobject,
       byte[] abytearray, char achar) {
     this.aboolean = aboolean;
-    this.abyte = abyte;
-    this.ashort = ashort;
+    this.aByte = aByte;
+    this.ASHORT = ASHORT;
     this.anint = anint;
     this.along = along;
     this.afloat = afloat;
@@ -124,8 +124,8 @@ public class ClassWithSupportedPdxFields implements PdxSerializerObject, Seriali
 
   @Override
   public String toString() {
-    return "ClassWithSupportedPdxFields{" + "aboolean=" + isAboolean() + ", abyte=" + getAbyte()
-        + ", achar=" + getAchar() + ", ashort=" + getAshort() + ", anint=" + getAnint() + ", along="
+    return "ClassWithSupportedPdxFields{" + "aboolean=" + isAboolean() + ", aByte=" + getAbyte()
+        + ", achar=" + getAchar() + ", ASHORT=" + getAshort() + ", anint=" + getAnint() + ", along="
         + getAlong() + ", afloat=" + getAfloat() + ", adouble=" + getAdouble() + ", astring='"
         + getAstring() + '\'' + ", adate=" + getAdate() + ", anobject=" + getAnobject()
         + ", abytearray=" + Arrays.toString(getAbytearray()) + '}';
@@ -136,7 +136,7 @@ public class ClassWithSupportedPdxFields implements PdxSerializerObject, Seriali
   }
 
   public byte getAbyte() {
-    return abyte;
+    return aByte;
   }
 
   public char getAchar() {
@@ -144,7 +144,7 @@ public class ClassWithSupportedPdxFields implements PdxSerializerObject, Seriali
   }
 
   public short getAshort() {
-    return ashort;
+    return ASHORT;
   }
 
   public int getAnint() {
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
index 7ae1488..7cdd055 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
@@ -103,11 +103,12 @@ public class JdbcDUnitTest implements Serializable {
     server.invoke(() -> {
       Connection connection = DriverManager.getConnection(CONNECTION_URL);
       Statement statement = connection.createStatement();
-      statement.execute("Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
-          + "aboolean smallint, " + "abyte smallint, " + "ashort smallint, " + "anint int, "
-          + "along bigint, " + "afloat float, " + "adouble double, " + "astring varchar(10), "
-          + "adate timestamp, " + "anobject varchar(20), " + "abytearray blob(100), "
-          + "achar char(1))");
+      statement
+          .execute("Create Table \"" + TABLE_NAME + "\" (\"id\" varchar(10) primary key not null, "
+              + "aboolean smallint, " + "abyte smallint, " + "ashort smallint, " + "anint int, "
+              + "\"along\" bigint, " + "\"aFloat\" float, " + "\"ADOUBLE\" double, "
+              + "astring varchar(10), " + "adate timestamp, " + "anobject varchar(20), "
+              + "abytearray blob(100), " + "achar char(1))");
     });
   }
 
@@ -115,7 +116,8 @@ public class JdbcDUnitTest implements Serializable {
     server.invoke(() -> {
       Connection connection = DriverManager.getConnection(CONNECTION_URL);
 
-      String insertQuery = "Insert into " + TABLE_NAME + " values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)";
+      String insertQuery =
+          "Insert into \"" + TABLE_NAME + "\" values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)";
       System.out.println("### Query is :" + insertQuery);
       PreparedStatement statement = connection.prepareStatement(insertQuery);
       statement.setObject(1, key);
@@ -140,7 +142,8 @@ public class JdbcDUnitTest implements Serializable {
     server.invoke(() -> {
       Connection connection = DriverManager.getConnection(CONNECTION_URL);
 
-      String insertQuery = "Insert into " + TABLE_NAME + " values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)";
+      String insertQuery =
+          "Insert into \"" + TABLE_NAME + "\" values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)";
       System.out.println("### Query is :" + insertQuery);
       PreparedStatement statement = connection.prepareStatement(insertQuery);
       statement.setObject(1, key);
@@ -168,19 +171,19 @@ public class JdbcDUnitTest implements Serializable {
     });
   }
 
-  private void closeDB() throws Exception {
-    try {
-      Connection connection = DriverManager.getConnection(CONNECTION_URL);
-      Statement statement = connection.createStatement();
-      if (statement == null) {
-        statement = connection.createStatement();
+  private void closeDB() throws SQLException {
+    try (Connection connection = DriverManager.getConnection(CONNECTION_URL)) {
+      try (Statement statement = connection.createStatement()) {
+        try {
+          statement.execute("Drop table " + TABLE_NAME);
+        } catch (SQLException ignore) {
+        }
+
+        try {
+          statement.execute("Drop table \"" + TABLE_NAME + "\"");
+        } catch (SQLException ignore) {
+        }
       }
-      statement.execute("Drop table " + TABLE_NAME);
-      statement.close();
-
-      connection.close();
-    } catch (SQLException ex) {
-      System.out.println("SQL Exception is thrown while closing the database.");
     }
   }
 
@@ -367,6 +370,7 @@ public class JdbcDUnitTest implements Serializable {
     });
   }
 
+  @Test
   public void getReadsFromDBWithPdxClassName() throws Exception {
     createTable();
     createRegionUsingGfsh(true, false, true);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index dc92514..eb44dee 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -84,6 +84,11 @@ public class JdbcLoaderIntegrationTest {
         + " (id varchar(10) primary key not null, name varchar(10), age int)");
   }
 
+  private void createEmployeeTableWithQuotes() throws Exception {
+    statement.execute("Create Table \"" + REGION_TABLE_NAME
+        + "\" (\"id\" varchar(10) primary key not null, \"name\" varchar(10), \"age\" int, \"Age\" int)");
+  }
+
   private void createClassWithSupportedPdxFieldsTable() throws Exception {
     statement.execute("Create Table " + REGION_TABLE_NAME
         + " (id varchar(10) primary key not null, " + "aboolean smallint, " + "abyte smallint, "
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingBuilderTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingBuilderTest.java
index 5abfd74..ba4edca 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingBuilderTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingBuilderTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.connectors.jdbc.internal;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -50,7 +51,8 @@ public class RegionMappingBuilderTest {
     assertThat(regionMapping.getPdxClassName()).isEqualTo("pdxClassName");
     assertThat(regionMapping.getConnectionConfigName()).isEqualTo("configName");
     assertThat(regionMapping.isPrimaryKeyInValue()).isTrue();
-    assertThat(regionMapping.getColumnNameForField("fieldName")).isEqualTo("columnName");
+    assertThat(regionMapping.getColumnNameForField("fieldName", mock(TableMetaDataView.class)))
+        .isEqualTo("columnName");
   }
 
   @Test
@@ -79,8 +81,10 @@ public class RegionMappingBuilderTest {
     RegionMapping regionMapping =
         new RegionMappingBuilder().withFieldToColumnMappings(fieldMappings).build();
 
-    assertThat(regionMapping.getColumnNameForField("field1")).isEqualTo("column1");
-    assertThat(regionMapping.getColumnNameForField("field2")).isEqualTo("column2");
+    assertThat(regionMapping.getColumnNameForField("field1", mock(TableMetaDataView.class)))
+        .isEqualTo("column1");
+    assertThat(regionMapping.getColumnNameForField("field2", mock(TableMetaDataView.class)))
+        .isEqualTo("column2");
   }
 
   @Test
@@ -89,8 +93,10 @@ public class RegionMappingBuilderTest {
     RegionMapping regionMapping =
         new RegionMappingBuilder().withFieldToColumnMappings(fieldMappings).build();
 
-    assertThat(regionMapping.getColumnNameForField("field1")).isEqualTo("column1");
-    assertThat(regionMapping.getColumnNameForField("field2")).isEqualTo("column2");
+    assertThat(regionMapping.getColumnNameForField("field1", mock(TableMetaDataView.class)))
+        .isEqualTo("column1");
+    assertThat(regionMapping.getColumnNameForField("field2", mock(TableMetaDataView.class)))
+        .isEqualTo("column2");
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
index 992d0c4..363adcf 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
@@ -15,8 +15,16 @@
 package org.apache.geode.connectors.jdbc.internal;
 
 import static org.assertj.core.api.Assertions.assertThat;
-
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import org.junit.Before;
@@ -25,6 +33,10 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.pdx.internal.PdxField;
+import org.apache.geode.pdx.internal.PdxType;
+import org.apache.geode.pdx.internal.TypeRegistry;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -58,15 +70,17 @@ public class RegionMappingTest {
   @Test
   public void initiatedWithNullValues() {
     mapping = new RegionMapping(null, null, null, null, false, null);
+
     assertThat(mapping.getTableName()).isNull();
     assertThat(mapping.getRegionName()).isNull();
     assertThat(mapping.getConnectionConfigName()).isNull();
     assertThat(mapping.getPdxClassName()).isNull();
     assertThat(mapping.getFieldToColumnMap()).isNull();
-    assertThat(mapping.getColumnToFieldMap()).isNull();
     assertThat(mapping.getRegionToTableName()).isNull();
-    assertThat(mapping.getColumnNameForField("fieldName")).isEqualTo("fieldName");
-    assertThat(mapping.getFieldNameForColumn("columnName")).isEqualTo("columnname");
+    assertThat(mapping.getColumnNameForField("fieldName", mock(TableMetaDataView.class)))
+        .isEqualTo("fieldName");
+    assertThat(mapping.getFieldNameForColumn("columnName", mock(TypeRegistry.class)))
+        .isEqualTo("columnName");
   }
 
   @Test
@@ -116,21 +130,170 @@ public class RegionMappingTest {
   }
 
   @Test
-  public void returnsFieldNameIfColumnNotMapped() {
+  public void returnsColumnNameIfFieldNotMapped() {
     fieldMap.put("otherField", "column");
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    Map<String, String> expectedFieldMap = new HashMap<>(fieldMap);
+    expectedFieldMap.put(fieldName1, fieldName1);
+    Map<String, String> expectedColumnMap = new HashMap<>();
+    expectedColumnMap.put("column", "otherField");
+    expectedColumnMap.put(fieldName1, fieldName1);
+
+    String columnName = mapping.getColumnNameForField(fieldName1, mock(TableMetaDataView.class));
+
+    assertThat(columnName).isEqualTo(fieldName1);
+  }
 
+  @Test
+  public void returnsColumnNameFromTableMetaDataIfFieldNotMappedAndMetaDataMatchesWithCaseDiffering() {
+    fieldMap.put("otherField", "column");
+    String metaDataColumnName = fieldName1.toUpperCase();
     mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
+    when(tableMetaDataView.getColumnNames()).thenReturn(Collections.singleton(metaDataColumnName));
 
-    assertThat(mapping.getColumnNameForField(fieldName1)).isEqualTo(fieldName1);
+    assertThat(mapping.getColumnNameForField(fieldName1, tableMetaDataView))
+        .isEqualTo(metaDataColumnName);
   }
 
   @Test
-  public void returnsColumnNameIfFieldNotMapped() {
+  public void returnsColumnNameFromTableMetaDataIfFieldNotMappedAndMetaDataMatchesExactly() {
     fieldMap.put("otherField", "column");
+    String metaDataColumnName = fieldName1;
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
+    when(tableMetaDataView.getColumnNames()).thenReturn(Collections.singleton(metaDataColumnName));
 
+    assertThat(mapping.getColumnNameForField(fieldName1, tableMetaDataView))
+        .isEqualTo(metaDataColumnName);
+  }
+
+  @Test
+  public void returnsColumnNameIfFieldNotMappedAndNotInMetaData() {
+    fieldMap.put("otherField", "column");
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
+    when(tableMetaDataView.getColumnNames()).thenReturn(Collections.singleton("does not match"));
+
+    assertThat(mapping.getColumnNameForField(fieldName1, tableMetaDataView)).isEqualTo(fieldName1);
+  }
+
+  @Test
+  public void getColumnNameForFieldThrowsIfTwoColumnsMatchField() {
+    fieldMap.put("otherField", "column");
     mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
+    HashSet<String> columnNames =
+        new HashSet<>(Arrays.asList(fieldName1.toUpperCase(), fieldName1.toLowerCase()));
+    when(tableMetaDataView.getColumnNames()).thenReturn(columnNames);
+
+    expectedException.expect(JdbcConnectorException.class);
+    expectedException
+        .expectMessage("The SQL table has at least two columns that match the PDX field: myField1");
+    mapping.getColumnNameForField(fieldName1, tableMetaDataView);
+
+  }
+
+  @Test
+  public void ifMixedCaseColumnNameNotMappedReturnsItAsFieldName() {
+    fieldMap.put("otherField", "column");
 
-    assertThat(mapping.getFieldNameForColumn("columnName")).isEqualTo("columnname");
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+
+    assertThat(mapping.getFieldNameForColumn("columnName", null)).isEqualTo("columnName");
+  }
+
+  @Test
+  public void ifLowerCaseColumnNameNotMappedReturnsItAsFieldName() {
+    fieldMap.put("otherField", "column");
+
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+
+    assertThat(mapping.getFieldNameForColumn("columnname", null)).isEqualTo("columnname");
+  }
+
+  @Test
+  public void ifUpperCaseColumnNameNotMappedReturnsItLowerCasedAsFieldName() {
+    fieldMap.put("otherField", "column");
+
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+
+    assertThat(mapping.getFieldNameForColumn("COLUMNNAME", null)).isEqualTo("columnname");
+  }
+
+
+  @Test
+  public void throwsIfColumnNotMappedAndPdxClassNameDoesNotExist() {
+    mapping = new RegionMapping(null, "pdxClassName", null, null, true, null);
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    when(typeRegistry.getPdxTypesForClassName("pdxClassName")).thenReturn(Collections.emptySet());
+    expectedException.expect(JdbcConnectorException.class);
+    expectedException.expectMessage("The class pdxClassName has not been pdx serialized.");
+
+    mapping.getFieldNameForColumn("columnName", typeRegistry);
+  }
+
+  @Test
+  public void throwsIfColumnNotMappedAndPdxClassNameDoesExistButHasNoMatchingFields() {
+    String pdxClassName = "pdxClassName";
+    String columnName = "columnName";
+    mapping = new RegionMapping(null, pdxClassName, null, null, true, null);
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    HashSet<PdxType> pdxTypes = new HashSet<>(Arrays.asList(mock(PdxType.class)));
+    when(typeRegistry.getPdxTypesForClassName(pdxClassName)).thenReturn(pdxTypes);
+    expectedException.expect(JdbcConnectorException.class);
+    expectedException.expectMessage("The class " + pdxClassName
+        + " does not have a field that matches the column " + columnName);
+
+    mapping.getFieldNameForColumn(columnName, typeRegistry);
+  }
+
+  @Test
+  public void throwsIfColumnNotMappedAndPdxClassNameDoesExistButHasMoreThanOneMatchingFields() {
+    String pdxClassName = "pdxClassName";
+    String columnName = "columnName";
+    mapping = new RegionMapping(null, pdxClassName, null, null, true, null);
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    PdxType pdxType = mock(PdxType.class);
+    when(pdxType.getFieldNames())
+        .thenReturn(Arrays.asList(columnName.toLowerCase(), columnName.toUpperCase()));
+    HashSet<PdxType> pdxTypes = new HashSet<>(Arrays.asList(pdxType));
+    when(typeRegistry.getPdxTypesForClassName(pdxClassName)).thenReturn(pdxTypes);
+    expectedException.expect(JdbcConnectorException.class);
+    expectedException.expectMessage(
+        "Could not determine what pdx field to use for the column name " + columnName);
+
+    mapping.getFieldNameForColumn(columnName, typeRegistry);
+  }
+
+  @Test
+  public void returnsIfColumnNotMappedAndPdxClassNameDoesExistAndHasOneFieldThatInexactlyMatches() {
+    String pdxClassName = "pdxClassName";
+    String columnName = "columnName";
+    mapping = new RegionMapping(null, pdxClassName, null, null, true, null);
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    PdxType pdxType = mock(PdxType.class);
+    when(pdxType.getFieldNames())
+        .thenReturn(Arrays.asList("someOtherField", columnName.toUpperCase()));
+    HashSet<PdxType> pdxTypes = new HashSet<>(Arrays.asList(pdxType));
+    when(typeRegistry.getPdxTypesForClassName(pdxClassName)).thenReturn(pdxTypes);
+
+    assertThat(mapping.getFieldNameForColumn(columnName, typeRegistry))
+        .isEqualTo(columnName.toUpperCase());
+  }
+
+  @Test
+  public void returnsIfColumnNotMappedAndPdxClassNameDoesExistAndHasOneFieldThatExactlyMatches() {
+    String pdxClassName = "pdxClassName";
+    String columnName = "columnName";
+    mapping = new RegionMapping(null, pdxClassName, null, null, true, null);
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    PdxType pdxType = mock(PdxType.class);
+    when(pdxType.getPdxField(columnName)).thenReturn(mock(PdxField.class));
+    HashSet<PdxType> pdxTypes = new HashSet<>(Arrays.asList(pdxType));
+    when(typeRegistry.getPdxTypesForClassName(pdxClassName)).thenReturn(pdxTypes);
+
+    assertThat(mapping.getFieldNameForColumn(columnName, typeRegistry)).isEqualTo(columnName);
   }
 
   @Test
@@ -139,7 +302,18 @@ public class RegionMappingTest {
 
     mapping = new RegionMapping(null, null, null, null, true, fieldMap);
 
-    assertThat(mapping.getColumnNameForField(fieldName1)).isEqualTo(columnName1);
+    assertThat(mapping.getColumnNameForField(fieldName1, mock(TableMetaDataView.class)))
+        .isEqualTo(columnName1);
+  }
+
+  @Test
+  public void returnsMappedColumnNameForFieldEvenIfMetaDataMatches() {
+    fieldMap.put(fieldName1, columnName1);
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
+    when(tableMetaDataView.getColumnNames()).thenReturn(Collections.singleton(fieldName1));
+
+    assertThat(mapping.getColumnNameForField(fieldName1, tableMetaDataView)).isEqualTo(columnName1);
   }
 
   @Test
@@ -148,25 +322,40 @@ public class RegionMappingTest {
 
     mapping = new RegionMapping(null, null, null, null, true, fieldMap);
 
-    assertThat(mapping.getFieldNameForColumn(columnName1)).isEqualTo(fieldName1);
+    assertThat(mapping.getFieldNameForColumn(columnName1, null)).isEqualTo(fieldName1);
+  }
+
+  @Test
+  public void returnsCachedFieldNameForColumn() {
+    fieldMap.put(fieldName1, columnName1);
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
+
+    mapping.getColumnNameForField(fieldName1, tableMetaDataView);
+
+    assertThat(mapping.getFieldNameForColumn(columnName1, null)).isEqualTo(fieldName1);
+  }
+
+  @Test
+  public void returnsCachedColumnNameForField() {
+    fieldMap.put(fieldName1, columnName1);
+    mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    mapping.getFieldNameForColumn(columnName1, null);
+
+    TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
+
+    assertThat(mapping.getColumnNameForField(fieldName1, tableMetaDataView)).isEqualTo(columnName1);
   }
 
   @Test
   public void returnsAllMappings() {
     fieldMap.put(fieldName1, columnName1);
     fieldMap.put(fieldName2, columnName2);
-
     mapping = new RegionMapping(null, null, null, null, true, fieldMap);
 
-    assertThat(mapping.getFieldToColumnMap().size()).isEqualTo(2);
-    assertThat(mapping.getFieldToColumnMap()).containsOnlyKeys(fieldName1, fieldName2);
-    assertThat(mapping.getFieldToColumnMap()).containsEntry(fieldName1, columnName1)
-        .containsEntry(fieldName2, columnName2);
-    assertThat(mapping.getColumnToFieldMap().size()).isEqualTo(2);
-    assertThat(mapping.getColumnToFieldMap()).containsOnlyKeys(columnName1.toLowerCase(),
-        columnName2.toLowerCase());
-    assertThat(mapping.getColumnToFieldMap()).containsEntry(columnName1.toLowerCase(), fieldName1)
-        .containsEntry(columnName2.toLowerCase(), fieldName2);
+    Map<String, String> actualFieldMap = mapping.getFieldToColumnMap();
+
+    assertThat(actualFieldMap).isEqualTo(fieldMap);
   }
 
   @Test
@@ -240,4 +429,58 @@ public class RegionMappingTest {
     assertThat(result).isFalse();
   }
 
+  @Test
+  public void verifyMappingWithDifferentRegionNamesAreNotEqual() {
+    RegionMapping rm1 = new RegionMapping(null, null, null, null, false, null);
+    RegionMapping rm2 = new RegionMapping("name", null, null, null, false, null);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyMappingWithDifferentPdxClassNameAreNotEqual() {
+    RegionMapping rm1 = new RegionMapping(null, null, null, null, false, null);
+    RegionMapping rm2 = new RegionMapping(null, "pdxClass", null, null, false, null);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyMappingWithDifferentFieldMappingAreNotEqual() {
+    Map<String, String> fieldMap1 = new HashMap<>();
+    Map<String, String> fieldMap2 = new HashMap<>();
+    fieldMap1.put(fieldName1, columnName1);
+    fieldMap2.put(fieldName2, columnName2);
+
+    RegionMapping rm1 = new RegionMapping("name", "pdxClass", "tname", "cc", true, fieldMap1);
+    RegionMapping rm2 = new RegionMapping("name", "pdxClass", "tname", "cc", true, fieldMap2);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyMappingWithOneFieldMappingNullAreNotEqual() {
+    Map<String, String> fieldMap1 = new HashMap<>();
+
+    RegionMapping rm1 = new RegionMapping("name", "pdxClass", "tname", "cc", true, fieldMap1);
+    RegionMapping rm2 = new RegionMapping("name", "pdxClass", "tname", "cc", true, null);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+
+    rm1 = new RegionMapping("name", "pdxClass", "tname", "cc", true, null);
+    rm2 = new RegionMapping("name", "pdxClass", "tname", "cc", true, fieldMap1);
+    result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyToStringForExpectedMessage() {
+    mapping = new RegionMapping("name", "pdxClass", "tname", "cc", true, null);
+
+    String result = mapping.toString();
+    System.out.println("DEBUG:" + result);
+
+    assertThat(result).contains("RegionMapping{regionName='");
+  }
+
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index efd929f..100fa13 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -96,6 +97,7 @@ public class SqlHandlerTest {
     when(region.getRegionService()).thenReturn(cache);
     tableMetaDataManager = mock(TableMetaDataManager.class);
     tableMetaDataView = mock(TableMetaDataView.class);
+    when(tableMetaDataView.getTableName()).thenReturn(TABLE_NAME);
     when(tableMetaDataView.getKeyColumnName()).thenReturn(KEY_COLUMN);
     when(tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
         .thenReturn(tableMetaDataView);
@@ -182,7 +184,7 @@ public class SqlHandlerTest {
   public void writeWithCharField() throws Exception {
     String fieldName = "fieldName";
     Object fieldValue = 'S';
-    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(regionMapping.getColumnNameForField(eq(fieldName), any())).thenReturn(fieldName);
     when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
     when(value.getField(fieldName)).thenReturn(fieldValue);
 
@@ -199,7 +201,7 @@ public class SqlHandlerTest {
   public void writeWithNonCharField() throws Exception {
     String fieldName = "fieldName";
     int fieldValue = 100;
-    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(regionMapping.getColumnNameForField(eq(fieldName), any())).thenReturn(fieldName);
     when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
     when(value.getField(fieldName)).thenReturn(fieldValue);
 
@@ -217,7 +219,7 @@ public class SqlHandlerTest {
     String fieldName = "fieldName";
     Object fieldValue = null;
     int dataType = 0;
-    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(regionMapping.getColumnNameForField(eq(fieldName), any())).thenReturn(fieldName);
     when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
     when(value.getField(fieldName)).thenReturn(fieldValue);
 
@@ -236,7 +238,7 @@ public class SqlHandlerTest {
     Object fieldValue = null;
     int dataType = 79;
     when(tableMetaDataView.getColumnDataType(fieldName)).thenReturn(dataType);
-    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(regionMapping.getColumnNameForField(eq(fieldName), any())).thenReturn(fieldName);
     when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
     when(value.getField(fieldName)).thenReturn(fieldValue);
 
@@ -402,7 +404,7 @@ public class SqlHandlerTest {
     when(primaryKeys.next()).thenReturn(true).thenReturn(false);
 
     List<ColumnValue> columnValueList =
-        handler.getColumnToValueList(connection, regionMapping, key, value, Operation.GET);
+        handler.getColumnToValueList(tableMetaDataView, regionMapping, key, value, Operation.GET);
 
     assertThat(columnValueList).hasSize(1);
     assertThat(columnValueList.get(0).getColumnName()).isEqualTo(KEY_COLUMN);
@@ -412,13 +414,13 @@ public class SqlHandlerTest {
   public void returnsCorrectColumnsForUpsertOperations() throws Exception {
     ResultSet primaryKeys = getPrimaryKeysMetaData();
     String nonKeyColumn = "otherColumn";
-    when(regionMapping.getColumnNameForField(KEY_COLUMN)).thenReturn(KEY_COLUMN);
-    when(regionMapping.getColumnNameForField(nonKeyColumn)).thenReturn(nonKeyColumn);
+    when(regionMapping.getColumnNameForField(eq(KEY_COLUMN), any())).thenReturn(KEY_COLUMN);
+    when(regionMapping.getColumnNameForField(eq(nonKeyColumn), any())).thenReturn(nonKeyColumn);
     when(primaryKeys.next()).thenReturn(true).thenReturn(false);
     when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn));
 
-    List<ColumnValue> columnValueList =
-        handler.getColumnToValueList(connection, regionMapping, key, value, Operation.UPDATE);
+    List<ColumnValue> columnValueList = handler.getColumnToValueList(tableMetaDataView,
+        regionMapping, key, value, Operation.UPDATE);
 
     assertThat(columnValueList).hasSize(2);
     assertThat(columnValueList.get(0).getColumnName()).isEqualTo(nonKeyColumn);
@@ -430,8 +432,8 @@ public class SqlHandlerTest {
     ResultSet primaryKeys = getPrimaryKeysMetaData();
     when(primaryKeys.next()).thenReturn(true).thenReturn(false);
 
-    List<ColumnValue> columnValueList =
-        handler.getColumnToValueList(connection, regionMapping, key, value, Operation.DESTROY);
+    List<ColumnValue> columnValueList = handler.getColumnToValueList(tableMetaDataView,
+        regionMapping, key, value, Operation.DESTROY);
 
     assertThat(columnValueList).hasSize(1);
     assertThat(columnValueList.get(0).getColumnName()).isEqualTo(KEY_COLUMN);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
index b8bdbc2..3bc3ae9 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
@@ -44,7 +44,7 @@ public class SqlStatementFactoryTest {
   @Test
   public void getSelectQueryString() throws Exception {
     String expectedStatement =
-        String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+        String.format("SELECT * FROM \"%s\" WHERE \"%s\" = ?", TABLE_NAME, KEY_COLUMN_NAME);
     List<ColumnValue> keyColumn = new ArrayList<>();
     keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null, 0));
 
@@ -56,7 +56,7 @@ public class SqlStatementFactoryTest {
   @Test
   public void getDestroySqlString() throws Exception {
     String expectedStatement =
-        String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+        String.format("DELETE FROM \"%s\" WHERE \"%s\" = ?", TABLE_NAME, KEY_COLUMN_NAME);
     List<ColumnValue> keyColumn = new ArrayList<>();
     keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null, 0));
 
@@ -67,9 +67,9 @@ public class SqlStatementFactoryTest {
 
   @Test
   public void getUpdateSqlString() throws Exception {
-    String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
-        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(2).getColumnName(),
-        KEY_COLUMN_NAME);
+    String expectedStatement = String.format(
+        "UPDATE \"%s\" SET \"%s\" = ?, \"%s\" = ? WHERE \"%s\" = ?", TABLE_NAME,
+        columnValues.get(0).getColumnName(), columnValues.get(2).getColumnName(), KEY_COLUMN_NAME);
 
     String statement = factory.createUpdateSqlString(TABLE_NAME, columnValues);
 
@@ -78,9 +78,10 @@ public class SqlStatementFactoryTest {
 
   @Test
   public void getInsertSqlString() throws Exception {
-    String expectedStatement = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?,?,?)",
-        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(1).getColumnName(),
-        columnValues.get(2).getColumnName());
+    String expectedStatement =
+        String.format("INSERT INTO \"%s\" (\"%s\", \"%s\", \"%s\") VALUES (?,?,?)", TABLE_NAME,
+            columnValues.get(0).getColumnName(), columnValues.get(1).getColumnName(),
+            columnValues.get(2).getColumnName());
 
     String statement = factory.createInsertSqlString(TABLE_NAME, columnValues);
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
index d71a11d..41cef01 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
@@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -113,7 +114,8 @@ public class SqlToPdxInstanceCreatorTest {
     when(resultSet.next()).thenReturn(true).thenReturn(false);
     PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
     when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(PDX_FIELD_NAME_2);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_2), any()))
+        .thenReturn(PDX_FIELD_NAME_2);
     sqlToPdxInstanceCreator =
         new SqlToPdxInstanceCreator(cache, regionMapping, resultSet, COLUMN_NAME_1);
 
@@ -130,8 +132,10 @@ public class SqlToPdxInstanceCreatorTest {
     when(resultSet.next()).thenReturn(true).thenReturn(false);
     PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
     when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(PDX_FIELD_NAME_2);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_1), any()))
+        .thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_2), any()))
+        .thenReturn(PDX_FIELD_NAME_2);
 
     sqlToPdxInstanceCreator.create();
 
@@ -147,7 +151,7 @@ public class SqlToPdxInstanceCreatorTest {
     PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
     when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
     String fieldName1 = "pdxFieldName1";
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(fieldName1);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_1), any())).thenReturn(fieldName1);
 
     sqlToPdxInstanceCreator.create();
 
@@ -161,7 +165,8 @@ public class SqlToPdxInstanceCreatorTest {
     setupResultSet(resultSet, fieldType);
     when(resultSet.next()).thenReturn(true).thenReturn(false);
     PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_1), any()))
+        .thenReturn(PDX_FIELD_NAME_1);
 
     sqlToPdxInstanceCreator.create();
 
@@ -175,7 +180,8 @@ public class SqlToPdxInstanceCreatorTest {
     setupResultSet(resultSet, fieldType, null);
     when(resultSet.next()).thenReturn(true).thenReturn(false);
     PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_1), any()))
+        .thenReturn(PDX_FIELD_NAME_1);
 
     sqlToPdxInstanceCreator.create();
 
@@ -193,7 +199,8 @@ public class SqlToPdxInstanceCreatorTest {
     when(resultSet.getString(1)).thenReturn("");
     when(resultSet.next()).thenReturn(true).thenReturn(false);
     PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_1), any()))
+        .thenReturn(PDX_FIELD_NAME_1);
 
     sqlToPdxInstanceCreator.create();
 
@@ -210,8 +217,10 @@ public class SqlToPdxInstanceCreatorTest {
     setupPdxInstanceFactory(fieldType);
     setupResultSetForObject(resultSet, returnValue);
     when(resultSet.next()).thenReturn(true).thenReturn(false);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(PDX_FIELD_NAME_2);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_1), any()))
+        .thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_2), any()))
+        .thenReturn(PDX_FIELD_NAME_2);
 
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage("Could not convert ");
@@ -243,7 +252,8 @@ public class SqlToPdxInstanceCreatorTest {
     when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
     when(pdxTypeRegistry.getPdxTypeForField(PDX_FIELD_NAME_1, pdxClassName)).thenReturn(pdxType);
     when(pdxType.getPdxField(PDX_FIELD_NAME_1)).thenReturn(null);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_1), any()))
+        .thenReturn(PDX_FIELD_NAME_1);
 
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage("Could not find PdxType");
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
index 28fcfa1..cf91661 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
@@ -46,7 +46,7 @@ public class TableMetaDataManagerIntegrationTest {
     connection = DriverManager.getConnection(CONNECTION_URL);
     statement = connection.createStatement();
     statement.execute("Create Table " + REGION_TABLE_NAME
-        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+        + " (\"id\" varchar(10) primary key not null, \"name\" varchar(10), \"age\" int)");
     manager = new TableMetaDataManager();
   }
 
@@ -73,7 +73,7 @@ public class TableMetaDataManagerIntegrationTest {
 
     String keyColumnName = metaData.getKeyColumnName();
 
-    assertThat(keyColumnName).isEqualToIgnoringCase("id");
+    assertThat(keyColumnName).isEqualTo("id");
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
index 6488b59..5ab18b1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
@@ -27,6 +27,9 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -113,11 +116,41 @@ public class TableMetaDataManagerTest {
   }
 
   @Test
-  public void throwsExceptionWhenTwoTablesHasCaseInsensitiveSameName() throws Exception {
+  public void returnsExactMatchTableNameWhenTwoTablesHasCaseInsensitiveSameName() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
     when(tablesResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
-    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase())
+        .thenReturn(TABLE_NAME);
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+
+    assertThat(data.getTableName()).isEqualTo(TABLE_NAME);
+  }
+
+  @Test
+  public void returnsMatchTableNameWhenMetaDataHasOneInexactMatch() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(databaseMetaData.getColumns(any(), any(), eq(TABLE_NAME.toUpperCase()), any()))
+        .thenReturn(columnResultSet);
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
     when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase());
 
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+
+    assertThat(data.getTableName()).isEqualTo(TABLE_NAME.toUpperCase());
+  }
+
+  @Test
+  public void throwsExceptionWhenTwoTablesHasCaseInsensitiveSameName() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toLowerCase())
+        .thenReturn(TABLE_NAME.toUpperCase());
+
     assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
         .isInstanceOf(JdbcConnectorException.class)
         .hasMessage("Duplicate tables that match region name");
@@ -168,55 +201,56 @@ public class TableMetaDataManagerTest {
   }
 
   @Test
-  public void validateThatCloseOnPrimaryKeysResultSetIsCalledByGetTableMetaDataView()
-      throws SQLException {
+  public void validateExpectedColumnNames() throws SQLException {
     setupPrimaryKeysMetaData();
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    String columnName1 = "columnName1";
+    int columnDataType1 = 1;
+    String columnName2 = "columnName2";
+    int columnDataType2 = 2;
+    when(columnResultSet.getString("COLUMN_NAME")).thenReturn(columnName1).thenReturn(columnName2);
+    when(columnResultSet.getInt("DATA_TYPE")).thenReturn(columnDataType1)
+        .thenReturn(columnDataType2);
+    Set<String> expectedColumnNames = new HashSet<>(Arrays.asList(columnName1, columnName2));
 
     TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
+    Set<String> columnNames = data.getColumnNames();
 
-    verify(primaryKeysResultSet).close();
+    assertThat(columnNames).isEqualTo(expectedColumnNames);
   }
 
   @Test
-  public void validateThatCloseOnColumnResultSetIsCalledByGetTableMetaDataView()
+  public void validateThatCloseOnPrimaryKeysResultSetIsCalledByGetTableMetaDataView()
       throws SQLException {
     setupPrimaryKeysMetaData();
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
     TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
 
-    verify(columnResultSet).close();
+    verify(primaryKeysResultSet).close();
   }
 
   @Test
-  public void lookingUpDataTypeWithNameThatDiffersInCaseWillFindIt() throws SQLException {
+  public void validateThatCloseOnColumnResultSetIsCalledByGetTableMetaDataView()
+      throws SQLException {
     setupPrimaryKeysMetaData();
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
-    when(columnResultSet.next()).thenReturn(true).thenReturn(false);
-    String columnName1 = "columnName1";
-    int columnDataType1 = 1;
-    when(columnResultSet.getString("COLUMN_NAME")).thenReturn(columnName1);
-    when(columnResultSet.getInt("DATA_TYPE")).thenReturn(columnDataType1);
 
     TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
-    int dataType1 = data.getColumnDataType(columnName1.toUpperCase());
 
-    assertThat(dataType1).isEqualTo(columnDataType1);
+    verify(columnResultSet).close();
   }
 
   @Test
-  public void throwsExceptionWhenTwoColumnsWithSameCaseInsensitiveNameExist() throws Exception {
+  public void validateTableNameIsSetByGetTableMetaDataView() throws SQLException {
     setupPrimaryKeysMetaData();
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
-    when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
-    when(columnResultSet.getString("COLUMN_NAME")).thenReturn("colName").thenReturn("COLNAME");
 
-    assertThatThrownBy(() -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME))
-        .isInstanceOf(JdbcConnectorException.class).hasMessage(
-            "Column names must be different in case. Two columns both have the name colname");
-  }
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME);
 
+    assertThat(data.getTableName()).isEqualTo(TABLE_NAME);
+  }
 
   private void setupPrimaryKeysMetaData() throws SQLException {
     when(primaryKeysResultSet.getString("COLUMN_NAME")).thenReturn(KEY_COLUMN);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java
index 9b812af..48bb2e0 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandIntegrationTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.connectors.jdbc.internal.cli;
 
 import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
 
 import org.junit.After;
 import org.junit.Before;
@@ -26,6 +27,7 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
 import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
 import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.connectors.jdbc.internal.TableMetaDataView;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.management.cli.Result;
 import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -80,8 +82,10 @@ public class CreateMappingCommandIntegrationTest {
     assertThat(regionMapping.getTableName()).isEqualTo(tableName);
     assertThat(regionMapping.getPdxClassName()).isEqualTo(pdxClass);
     assertThat(regionMapping.isPrimaryKeyInValue()).isEqualTo(keyInValue);
-    assertThat(regionMapping.getColumnNameForField("field1")).isEqualTo("column1");
-    assertThat(regionMapping.getColumnNameForField("field2")).isEqualTo("column2");
+    assertThat(regionMapping.getColumnNameForField("field1", mock(TableMetaDataView.class)))
+        .isEqualTo("column1");
+    assertThat(regionMapping.getColumnNameForField("field2", mock(TableMetaDataView.class)))
+        .isEqualTo("column2");
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/JdbcClusterConfigDistributedTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/JdbcClusterConfigDistributedTest.java
index 97a73c5..112ce49 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/JdbcClusterConfigDistributedTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/JdbcClusterConfigDistributedTest.java
@@ -20,6 +20,7 @@ import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.Serializable;
 import java.util.Properties;
@@ -34,6 +35,7 @@ import org.junit.experimental.categories.Category;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
 import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.connectors.jdbc.internal.TableMetaDataView;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.cache.InternalCache;
@@ -139,8 +141,10 @@ public class JdbcClusterConfigDistributedTest implements Serializable {
     assertThat(regionMapping.getTableName()).isEqualTo(tableName);
     assertThat(regionMapping.getPdxClassName()).isEqualTo(pdxClass);
     assertThat(regionMapping.isPrimaryKeyInValue()).isEqualTo(keyInValue);
-    assertThat(regionMapping.getColumnNameForField("field1")).isEqualTo("column1");
-    assertThat(regionMapping.getColumnNameForField("field2")).isEqualTo("column2");
+    assertThat(regionMapping.getColumnNameForField("field1", mock(TableMetaDataView.class)))
+        .isEqualTo("column1");
+    assertThat(regionMapping.getColumnNameForField("field2", mock(TableMetaDataView.class)))
+        .isEqualTo("column2");
   }
 
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
index 7612527..912da68 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
@@ -49,6 +49,7 @@ import org.apache.geode.connectors.jdbc.internal.ConnectionConfigBuilder;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
 import org.apache.geode.connectors.jdbc.internal.RegionMapping;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingBuilder;
+import org.apache.geode.connectors.jdbc.internal.TableMetaDataView;
 import org.apache.geode.internal.cache.extension.ExtensionPoint;
 import org.apache.geode.internal.cache.xmlcache.CacheCreation;
 import org.apache.geode.test.junit.categories.UnitTest;
@@ -234,7 +235,8 @@ public class ElementTypeTest {
     ElementType.FIELD_MAPPING.startElement(stack, attributes);
 
     RegionMapping regionMapping = ((RegionMappingBuilder) stack.pop()).build();
-    assertThat(regionMapping.getColumnNameForField("fieldName")).isEqualTo("columnName");
+    assertThat(regionMapping.getColumnNameForField("fieldName", mock(TableMetaDataView.class)))
+        .isEqualTo("columnName");
   }
 
   @Test
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java
index 22d0384..4b3efff 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java
@@ -16,8 +16,10 @@ package org.apache.geode.pdx.internal;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
 
@@ -251,6 +253,20 @@ public class ClientTypeRegistration implements TypeRegistration {
   }
 
   @Override
+  public Set<PdxType> getPdxTypesForClassName(String className) {
+    Set<PdxType> result = new HashSet<>();
+    for (Object value : types().values()) {
+      if (value instanceof PdxType) {
+        PdxType pdxType = (PdxType) value;
+        if (pdxType.getClassName().equals(className)) {
+          result.add(pdxType);
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
   public void testClearRegistry() {}
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/LonerTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/LonerTypeRegistration.java
index b6d8a04..9d78a57 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/LonerTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/LonerTypeRegistration.java
@@ -15,6 +15,7 @@
 package org.apache.geode.pdx.internal;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.internal.cache.InternalCache;
@@ -147,6 +148,11 @@ public class LonerTypeRegistration implements TypeRegistration {
   }
 
   @Override
+  public Set<PdxType> getPdxTypesForClassName(String className) {
+    return delegate.getPdxTypesForClassName(className);
+  }
+
+  @Override
   public void testClearRegistry() {}
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/NullTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/NullTypeRegistration.java
index 2f5aacc..625383e 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/NullTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/NullTypeRegistration.java
@@ -16,6 +16,7 @@ package org.apache.geode.pdx.internal;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.pdx.PdxInitializationException;
@@ -90,6 +91,11 @@ public class NullTypeRegistration implements TypeRegistration {
   }
 
   @Override
+  public Set<PdxType> getPdxTypesForClassName(String className) {
+    return Collections.emptySet();
+  }
+
+  @Override
   public void testClearRegistry() {
 
   }
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
index 32e4c31..8aa76b1 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java
@@ -102,8 +102,7 @@ public class PeerTypeRegistration implements TypeRegistration {
   private Map<EnumInfo, EnumId> enumToId =
       Collections.synchronizedMap(new HashMap<EnumInfo, EnumId>());
 
-  private final Map<String, Set<PdxType>> classToType =
-      new CopyOnWriteHashMap<String, Set<PdxType>>();
+  private final Map<String, CopyOnWriteHashSet<PdxType>> classToType = new CopyOnWriteHashMap<>();
 
   private volatile boolean typeRegistryInUse = false;
 
@@ -753,9 +752,10 @@ public class PeerTypeRegistration implements TypeRegistration {
   private void updateClassToTypeMap(PdxType type) {
     if (type != null) {
       synchronized (this.classToType) {
-        if (type.getClassName().equals(JSONFormatter.JSON_CLASSNAME))
-          return;// no need to include here
-        Set<PdxType> pdxTypeSet = this.classToType.get(type.getClassName());
+        if (type.getClassName().equals(JSONFormatter.JSON_CLASSNAME)) {
+          return; // no need to include here
+        }
+        CopyOnWriteHashSet<PdxType> pdxTypeSet = this.classToType.get(type.getClassName());
         if (pdxTypeSet == null) {
           pdxTypeSet = new CopyOnWriteHashSet<PdxType>();
         }
@@ -767,21 +767,29 @@ public class PeerTypeRegistration implements TypeRegistration {
 
   @Override
   public PdxType getPdxTypeForField(String fieldName, String className) {
-    Set<PdxType> pdxTypes = classToType.get(className);
-    if (pdxTypes != null) {
-      for (PdxType pdxType : pdxTypes) {
-        if (pdxType.getPdxField(fieldName) != null) {
-          return pdxType;
-        }
+    Set<PdxType> pdxTypes = getPdxTypesForClassName(className);
+    for (PdxType pdxType : pdxTypes) {
+      if (pdxType.getPdxField(fieldName) != null) {
+        return pdxType;
       }
     }
     return null;
   }
 
+  @Override
+  public Set<PdxType> getPdxTypesForClassName(String className) {
+    CopyOnWriteHashSet<PdxType> pdxTypeSet = classToType.get(className);
+    if (pdxTypeSet == null) {
+      return Collections.emptySet();
+    } else {
+      return pdxTypeSet.getSnapshot();
+    }
+  }
+
   /**
    * For testing purpose
    */
-  public Map<String, Set<PdxType>> getClassToType() {
+  public Map<String, CopyOnWriteHashSet<PdxType>> getClassToType() {
     return classToType;
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistration.java
index 6d23389..c263f9a 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistration.java
@@ -15,6 +15,7 @@
 package org.apache.geode.pdx.internal;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.geode.cache.wan.GatewaySender;
 
@@ -81,12 +82,16 @@ public interface TypeRegistration {
   /**
    * Returns PdxType having the field
    *
-   * @param fieldName
-   * @param className
    * @return PdxType or null if field not present
    */
   PdxType getPdxTypeForField(String fieldName, String className);
 
+  /**
+   * Returns all the PdxTypes for the given class name.
+   * An empty set will be returned if no types exist.
+   */
+  Set<PdxType> getPdxTypesForClassName(String className);
+
   /*
    * test hook
    */
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
index 5537694..2479d29 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
@@ -17,6 +17,7 @@ package org.apache.geode.pdx.internal;
 import static java.lang.Integer.*;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.logging.log4j.Logger;
@@ -497,6 +498,14 @@ public class TypeRegistry {
     return this.distributedTypeRegistry.getPdxTypeForField(fieldName, className);
   }
 
+  /**
+   * Returns all the PdxTypes for the given class name.
+   * An empty set will be returned if no types exist.
+   */
+  public Set<PdxType> getPdxTypesForClassName(String className) {
+    return this.distributedTypeRegistry.getPdxTypesForClassName(className);
+  }
+
   public void addImportedType(int typeId, PdxType importedType) {
     PdxType existing = getType(typeId);
     if (existing != null && !existing.equals(importedType)) {
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
index 5018472..1febfd2 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java
@@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
 import org.junit.Test;
@@ -50,6 +49,7 @@ import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.cache30.ClientServerTestCase;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.CopyOnWriteHashSet;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.pdx.FieldType;
@@ -2648,7 +2648,8 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
       public Object call() throws Exception {
         TypeRegistration registration = getCache().getPdxRegistry().getTypeRegistration();
         Assert.assertTrue(registration instanceof PeerTypeRegistration);
-        Map<String, Set<PdxType>> m = ((PeerTypeRegistration) registration).getClassToType();
+        Map<String, CopyOnWriteHashSet<PdxType>> m =
+            ((PeerTypeRegistration) registration).getClassToType();
         assertEquals(1, m.size());
         assertEquals("PdxVersionedNewPortfolio", m.keySet().iterator().next());
         assertEquals(2, m.values().iterator().next().size());
@@ -2665,7 +2666,8 @@ public class PdxQueryDUnitTest extends PDXQueryTestBase {
       public Object call() throws Exception {
         TypeRegistration registration = getCache().getPdxRegistry().getTypeRegistration();
         Assert.assertTrue(registration instanceof PeerTypeRegistration);
-        Map<String, Set<PdxType>> m = ((PeerTypeRegistration) registration).getClassToType();
+        Map<String, CopyOnWriteHashSet<PdxType>> m =
+            ((PeerTypeRegistration) registration).getClassToType();
         assertEquals(1, m.size());
         assertEquals("PdxVersionedNewPortfolio", m.keySet().iterator().next());
         assertEquals(2, m.values().iterator().next().size());

-- 
To stop receiving notification emails like this one, please contact
agingade@apache.org.