You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/03/15 01:23:26 UTC

[geode] branch develop updated: GEODE-4693: fix JDBCLoader on non-object pdx fields (#1517)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f7c7451  GEODE-4693: fix JDBCLoader on non-object pdx fields (#1517)
f7c7451 is described below

commit f7c745130ba8e53a60a8cc21fd6173dbbc015f85
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed Mar 14 18:23:23 2018 -0700

    GEODE-4693: fix JDBCLoader on non-object pdx fields (#1517)
    
    If the region mapping has a pdxClassName then the JdbcLoader will look for a pdx type that is already defined and use its field types when creating the PdxInstance. If the pdx type does not have a field that corresponds to an existing column then an exception is thrown.
    
    A pdx field of type "char" or an instance of "java.lang.Character" will be sent to SQL as a String of size 1.
    A pdx field of type "Date" will be sent to SQL as a java.sql.Timestamp.
    
    A new external API was added to GemFireCache. It is the method "registerPdxMetaData".
    It only needs to be called on clients if they are going to do reads using a JdbcLoader of data that wa s not written using the JdbcWriter or JdbcAsyncWriter.
---
 .../connectors/jdbc/JdbcConnectorException.java    |  37 +-
 .../jdbc/internal/AbstractJdbcCallback.java        |   3 +-
 .../geode/connectors/jdbc/internal/SqlHandler.java |  87 ++--
 .../jdbc/internal/SqlToPdxInstanceCreator.java     | 205 ++++++++
 .../jdbc/internal/TableKeyColumnManager.java       |  10 +-
 .../jdbc/ClassWithSupportedPdxFields.java          | 181 ++++++++
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |  19 +-
 .../jdbc/JdbcConnectorExceptionTest.java           |  42 +-
 .../geode/connectors/jdbc/JdbcDUnitTest.java       | 218 +++++++--
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java | 141 +++++-
 .../connectors/jdbc/JdbcWriterIntegrationTest.java |   7 +-
 .../connectors/jdbc/internal/SqlHandlerTest.java   | 190 +++-----
 .../jdbc/internal/SqlToPdxInstanceCreatorTest.java | 515 +++++++++++++++++++++
 .../jdbc/internal/TableKeyColumnManagerTest.java   |   8 +-
 .../jdbc/internal/TestConfigService.java           |  22 +-
 .../java/org/apache/geode/cache/GemFireCache.java  |  17 +
 .../geode/internal/cache/GemFireCacheImpl.java     |  11 +
 .../internal/cache/xmlcache/CacheCreation.java     |   5 +
 .../pdx/internal/AutoSerializableManager.java      |   2 +-
 .../apache/geode/pdx/internal/TypeRegistry.java    |   2 +-
 .../DataCommandFunctionWithPDXJUnitTest.java       |   2 +-
 .../geode/pdx/AutoSerializableJUnitTest.java       |  10 +-
 .../apache/geode/pdx/PdxClientServerDUnitTest.java |  14 +-
 .../rules/DistributedRestoreSystemProperties.java  |   8 +-
 .../geode/test/junit/rules/ServerStarterRule.java  |  18 +-
 25 files changed, 1490 insertions(+), 284 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcConnectorException.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcConnectorException.java
index d152873..7403415 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcConnectorException.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcConnectorException.java
@@ -42,15 +42,39 @@ public class JdbcConnectorException extends CacheRuntimeException {
    *         its message if not
    */
   public static JdbcConnectorException createException(Exception e) {
-    String message;
     if (containsNonSerializableException(e)) {
-      message = e.getMessage() + System.lineSeparator() + ExceptionUtils.getFullStackTrace(e);
+      String message =
+          e.getMessage() + System.lineSeparator() + ExceptionUtils.getFullStackTrace(e);
       return new JdbcConnectorException(message);
     } else {
       return new JdbcConnectorException(e);
     }
   }
 
+  /**
+   * Create a new JdbcConnectorException by first checking to see if the causing exception is or
+   * contains an exception that potentially could not be deserialized by remote systems receiving
+   * the serialized exception.
+   *
+   * @param message message of this Exception
+   * @param e cause of this Exception
+   * @return a new JdbcConnectorException containing either the causing exception, if it can be
+   *         serialized/deserialized by Geode, or containing the causing exception stack trace in
+   *         its message if not
+   */
+  public static JdbcConnectorException createException(String message, Exception e) {
+    if (containsNonSerializableException(e)) {
+      message += e.getMessage() + System.lineSeparator() + ExceptionUtils.getFullStackTrace(e);
+      return new JdbcConnectorException(message);
+    } else {
+      return new JdbcConnectorException(message, e);
+    }
+  }
+
+  public JdbcConnectorException(String message) {
+    super(message);
+  }
+
   /*
    * SQLExceptions likely are instances of or contain exceptions from the underlying SQL driver
    * and potentially cannot be deserialzed by other systems (e.g. client or locators) that do not
@@ -65,11 +89,12 @@ public class JdbcConnectorException extends CacheRuntimeException {
       return true;
     }
 
-    Throwable cause;
-    while ((cause = e.getCause()) != null) {
+    Throwable cause = e.getCause();
+    while (cause != null) {
       if (cause instanceof SQLException) {
         return true;
       }
+      cause = cause.getCause();
     }
     return false;
   }
@@ -78,7 +103,7 @@ public class JdbcConnectorException extends CacheRuntimeException {
     super(e);
   }
 
-  public JdbcConnectorException(String message) {
-    super(message);
+  private JdbcConnectorException(String message, Exception e) {
+    super(message, e);
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
index 9074e68..4b23f6e 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -61,8 +61,9 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
     if (sqlHandler == null) {
       this.cache = cache;
       JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
+      TableKeyColumnManager tableKeyColumnManager = new TableKeyColumnManager();
       DataSourceManager manager = new DataSourceManager(new HikariJdbcDataSourceFactory());
-      sqlHandler = new SqlHandler(manager, service);
+      sqlHandler = new SqlHandler(manager, tableKeyColumnManager, service);
     }
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 61d85f2..d45815a 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -17,19 +17,18 @@ package org.apache.geode.connectors.jdbc.internal;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.geode.InternalGemFireException;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.connectors.jdbc.JdbcConnectorException;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxInstanceFactory;
 
 @Experimental
 public class SqlHandler {
@@ -37,11 +36,7 @@ public class SqlHandler {
   private final DataSourceManager manager;
   private final TableKeyColumnManager tableKeyColumnManager;
 
-  public SqlHandler(DataSourceManager manager, JdbcConnectorService configService) {
-    this(manager, new TableKeyColumnManager(), configService);
-  }
-
-  SqlHandler(DataSourceManager manager, TableKeyColumnManager tableKeyColumnManager,
+  public SqlHandler(DataSourceManager manager, TableKeyColumnManager tableKeyColumnManager,
       JdbcConnectorService configService) {
     this.manager = manager;
     this.tableKeyColumnManager = tableKeyColumnManager;
@@ -71,18 +66,29 @@ public class SqlHandler {
           getColumnToValueList(connection, regionMapping, key, null, Operation.GET);
       try (PreparedStatement statement =
           getPreparedStatement(connection, columnList, tableName, Operation.GET)) {
-        PdxInstanceFactory factory = getPdxInstanceFactory(region, regionMapping);
-        String keyColumnName = getKeyColumnName(connection, tableName);
-        result = executeReadStatement(statement, columnList, factory, regionMapping, keyColumnName);
+        try (ResultSet resultSet = executeReadQuery(statement, columnList)) {
+          String keyColumnName = getKeyColumnName(connection, tableName);
+          InternalCache cache = (InternalCache) region.getRegionService();
+          SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
+              new SqlToPdxInstanceCreator(cache, regionMapping, resultSet, keyColumnName);
+          result = sqlToPdxInstanceCreator.create();
+        }
       }
     }
     return result;
   }
 
+  private ResultSet executeReadQuery(PreparedStatement statement, List<ColumnValue> columnList)
+      throws SQLException {
+    setValuesInStatement(statement, columnList);
+    return statement.executeQuery();
+  }
+
+
   private RegionMapping getMappingForRegion(String regionName) {
     RegionMapping regionMapping = this.configService.getMappingForRegion(regionName);
     if (regionMapping == null) {
-      throw new IllegalStateException("JDBC mapping for region " + regionName
+      throw new JdbcConnectorException("JDBC mapping for region " + regionName
           + " not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
     }
     return regionMapping;
@@ -92,7 +98,7 @@ public class SqlHandler {
     ConnectionConfiguration connectionConfig =
         this.configService.getConnectionConfig(connectionConfigName);
     if (connectionConfig == null) {
-      throw new IllegalStateException("JDBC connection with name " + connectionConfigName
+      throw new JdbcConnectorException("JDBC connection with name " + connectionConfigName
           + " not found. Create the connection with the gfsh command 'create jdbc-connection'");
     }
     return connectionConfig;
@@ -102,59 +108,19 @@ public class SqlHandler {
     return this.tableKeyColumnManager.getKeyColumnName(connection, tableName);
   }
 
-  private <K, V> PdxInstanceFactory getPdxInstanceFactory(Region<K, V> region,
-      RegionMapping regionMapping) {
-    InternalCache cache = (InternalCache) region.getRegionService();
-    String valueClassName = regionMapping.getPdxClassName();
-    PdxInstanceFactory factory;
-    if (valueClassName != null) {
-      factory = cache.createPdxInstanceFactory(valueClassName);
-    } else {
-      factory = cache.createPdxInstanceFactory("no class", false);
-    }
-    return factory;
-  }
-
-  PdxInstance executeReadStatement(PreparedStatement statement, List<ColumnValue> columnList,
-      PdxInstanceFactory factory, RegionMapping regionMapping, String keyColumnName)
-      throws SQLException {
-    PdxInstance pdxInstance = null;
-    setValuesInStatement(statement, columnList);
-    try (ResultSet resultSet = statement.executeQuery()) {
-      if (resultSet.next()) {
-        ResultSetMetaData metaData = resultSet.getMetaData();
-        int ColumnsNumber = metaData.getColumnCount();
-        for (int i = 1; i <= ColumnsNumber; i++) {
-          Object columnValue = resultSet.getObject(i);
-          String columnName = metaData.getColumnName(i);
-          String fieldName = mapColumnNameToFieldName(columnName, regionMapping);
-          if (regionMapping.isPrimaryKeyInValue() || !keyColumnName.equalsIgnoreCase(columnName)) {
-            factory.writeField(fieldName, columnValue, Object.class);
-          }
-        }
-        if (resultSet.next()) {
-          throw new JdbcConnectorException(
-              "Multiple rows returned for query: " + resultSet.getStatement().toString());
-        }
-        pdxInstance = factory.create();
-      }
-    }
-    return pdxInstance;
-  }
-
   private void setValuesInStatement(PreparedStatement statement, List<ColumnValue> columnList)
       throws SQLException {
     int index = 0;
     for (ColumnValue columnValue : columnList) {
       index++;
-      statement.setObject(index, columnValue.getValue());
+      Object value = columnValue.getValue();
+      if (value instanceof Character) {
+        value = ((Character) value).toString();
+      }
+      statement.setObject(index, value);
     }
   }
 
-  private String mapColumnNameToFieldName(String columnName, RegionMapping regionMapping) {
-    return regionMapping.getFieldNameForColumn(columnName);
-  }
-
   public <K, V> void write(Region<K, V> region, Operation operation, K key, PdxInstance value)
       throws SQLException {
     if (value == null && operation != Operation.DESTROY) {
@@ -192,9 +158,7 @@ public class SqlHandler {
         }
       }
 
-      if (updateCount != 1) {
-        throw new IllegalStateException("Unexpected updateCount " + updateCount);
-      }
+      assert updateCount == 1;
     }
   }
 
@@ -225,7 +189,7 @@ public class SqlHandler {
     } else if (operation.isGet()) {
       return statementFactory.createSelectQueryString(tableName, columnList);
     } else {
-      throw new IllegalArgumentException("unsupported operation " + operation);
+      throw new InternalGemFireException("unsupported operation " + operation);
     }
   }
 
@@ -257,4 +221,5 @@ public class SqlHandler {
     }
     return result;
   }
+
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
new file mode 100644
index 0000000..fd0480b
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.FieldType;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxField;
+import org.apache.geode.pdx.internal.PdxType;
+
+class SqlToPdxInstanceCreator {
+  private final InternalCache cache;
+  private final RegionMapping regionMapping;
+  private final ResultSet resultSet;
+  private final String keyColumnName;
+
+  public SqlToPdxInstanceCreator(InternalCache cache, RegionMapping regionMapping,
+      ResultSet resultSet, String keyColumnName) {
+    this.cache = cache;
+    this.regionMapping = regionMapping;
+    this.resultSet = resultSet;
+    this.keyColumnName = keyColumnName;
+  }
+
+  public PdxInstance create() throws SQLException {
+    PdxInstanceFactory factory = getPdxInstanceFactory(cache, regionMapping);
+    PdxInstance pdxInstance = null;
+    if (resultSet.next()) {
+      ResultSetMetaData metaData = resultSet.getMetaData();
+      int ColumnsNumber = metaData.getColumnCount();
+      for (int i = 1; i <= ColumnsNumber; i++) {
+        String columnName = metaData.getColumnName(i);
+        if (regionMapping.isPrimaryKeyInValue() || !keyColumnName.equalsIgnoreCase(columnName)) {
+          String fieldName = mapColumnNameToFieldName(columnName, regionMapping);
+          FieldType fieldType = getFieldType(cache, regionMapping.getPdxClassName(), fieldName);
+          writeField(factory, resultSet, i, fieldName, fieldType);
+        }
+      }
+      if (resultSet.next()) {
+        throw new JdbcConnectorException(
+            "Multiple rows returned for query: " + resultSet.getStatement().toString());
+      }
+      pdxInstance = factory.create();
+    }
+    return pdxInstance;
+  }
+
+  private String mapColumnNameToFieldName(String columnName, RegionMapping regionMapping) {
+    return regionMapping.getFieldNameForColumn(columnName);
+  }
+
+  private PdxInstanceFactory getPdxInstanceFactory(InternalCache cache,
+      RegionMapping regionMapping) {
+    String valueClassName = regionMapping.getPdxClassName();
+    PdxInstanceFactory factory;
+    if (valueClassName != null) {
+      factory = cache.createPdxInstanceFactory(valueClassName);
+    } else {
+      factory = cache.createPdxInstanceFactory("no class", false);
+    }
+    return factory;
+  }
+
+  /**
+   * @throws SQLException if the column value get fails
+   */
+  private void writeField(PdxInstanceFactory factory, ResultSet resultSet, int columnIndex,
+      String fieldName, FieldType fieldType) throws SQLException {
+    switch (fieldType) {
+      case STRING:
+        factory.writeString(fieldName, resultSet.getString(columnIndex));
+        break;
+      case CHAR:
+        char charValue = 0;
+        String columnValue = resultSet.getString(columnIndex);
+        if (columnValue != null && columnValue.length() > 0) {
+          charValue = columnValue.toCharArray()[0];
+        }
+        factory.writeChar(fieldName, charValue);
+        break;
+      case SHORT:
+        factory.writeShort(fieldName, resultSet.getShort(columnIndex));
+        break;
+      case INT:
+        factory.writeInt(fieldName, resultSet.getInt(columnIndex));
+        break;
+      case LONG:
+        factory.writeLong(fieldName, resultSet.getLong(columnIndex));
+        break;
+      case FLOAT:
+        factory.writeFloat(fieldName, resultSet.getFloat(columnIndex));
+        break;
+      case DOUBLE:
+        factory.writeDouble(fieldName, resultSet.getDouble(columnIndex));
+        break;
+      case BYTE:
+        factory.writeByte(fieldName, resultSet.getByte(columnIndex));
+        break;
+      case BOOLEAN:
+        factory.writeBoolean(fieldName, resultSet.getBoolean(columnIndex));
+        break;
+      case DATE:
+        java.sql.Timestamp sqlDate = resultSet.getTimestamp(columnIndex);
+        java.util.Date pdxDate = null;
+        if (sqlDate != null) {
+          pdxDate = new java.util.Date(sqlDate.getTime());
+        }
+        factory.writeDate(fieldName, pdxDate);
+        break;
+      case BYTE_ARRAY:
+        factory.writeByteArray(fieldName, resultSet.getBytes(columnIndex));
+        break;
+      case BOOLEAN_ARRAY:
+        factory.writeBooleanArray(fieldName,
+            convertJdbcObjectToJavaType(boolean[].class, resultSet.getObject(columnIndex)));
+        break;
+      case CHAR_ARRAY:
+        factory.writeCharArray(fieldName,
+            convertJdbcObjectToJavaType(char[].class, resultSet.getObject(columnIndex)));
+        break;
+      case SHORT_ARRAY:
+        factory.writeShortArray(fieldName,
+            convertJdbcObjectToJavaType(short[].class, resultSet.getObject(columnIndex)));
+        break;
+      case INT_ARRAY:
+        factory.writeIntArray(fieldName,
+            convertJdbcObjectToJavaType(int[].class, resultSet.getObject(columnIndex)));
+        break;
+      case LONG_ARRAY:
+        factory.writeLongArray(fieldName,
+            convertJdbcObjectToJavaType(long[].class, resultSet.getObject(columnIndex)));
+        break;
+      case FLOAT_ARRAY:
+        factory.writeFloatArray(fieldName,
+            convertJdbcObjectToJavaType(float[].class, resultSet.getObject(columnIndex)));
+        break;
+      case DOUBLE_ARRAY:
+        factory.writeDoubleArray(fieldName,
+            convertJdbcObjectToJavaType(double[].class, resultSet.getObject(columnIndex)));
+        break;
+      case STRING_ARRAY:
+        factory.writeStringArray(fieldName,
+            convertJdbcObjectToJavaType(String[].class, resultSet.getObject(columnIndex)));
+        break;
+      case OBJECT_ARRAY:
+        factory.writeObjectArray(fieldName,
+            convertJdbcObjectToJavaType(Object[].class, resultSet.getObject(columnIndex)));
+        break;
+      case ARRAY_OF_BYTE_ARRAYS:
+        factory.writeArrayOfByteArrays(fieldName,
+            convertJdbcObjectToJavaType(byte[][].class, resultSet.getObject(columnIndex)));
+        break;
+      case OBJECT:
+        factory.writeObject(fieldName, resultSet.getObject(columnIndex));
+        break;
+    }
+  }
+
+  private <T> T convertJdbcObjectToJavaType(Class<T> javaType, Object jdbcObject) {
+    try {
+      return javaType.cast(jdbcObject);
+    } catch (ClassCastException classCastException) {
+      throw JdbcConnectorException.createException("Could not convert "
+          + jdbcObject.getClass().getTypeName() + " to " + javaType.getTypeName(),
+          classCastException);
+    }
+  }
+
+  private FieldType getFieldType(InternalCache cache, String pdxClassName, String fieldName) {
+    if (pdxClassName == null) {
+      return FieldType.OBJECT;
+    }
+
+    PdxType pdxType = cache.getPdxRegistry().getPdxTypeForField(fieldName, pdxClassName);
+    if (pdxType != null) {
+      PdxField pdxField = pdxType.getPdxField(fieldName);
+      if (pdxField != null) {
+        return pdxField.getFieldType();
+      }
+    }
+
+    throw new JdbcConnectorException("Could not find PdxType for field " + fieldName
+        + ". Add class " + pdxClassName + " with " + fieldName + " to pdx registry.");
+
+  }
+
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManager.java
index 5fccb66..b7d972d 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManager.java
@@ -30,7 +30,7 @@ import org.apache.geode.connectors.jdbc.JdbcConnectorException;
  * than one column as a primary key or no columns then an exception is thrown. The computation is
  * remembered so that it does not need to be recomputed for the same table name.
  */
-class TableKeyColumnManager {
+public class TableKeyColumnManager {
   private final ConcurrentMap<String, String> tableToPrimaryKeyMap = new ConcurrentHashMap<>();
 
   public String getKeyColumnName(Connection connection, String tableName) {
@@ -58,14 +58,14 @@ class TableKeyColumnManager {
       String name = tables.getString("TABLE_NAME");
       if (name.equalsIgnoreCase(tableName)) {
         if (realTableName != null) {
-          throw new IllegalStateException("Duplicate tables that match region name");
+          throw new JdbcConnectorException("Duplicate tables that match region name");
         }
         realTableName = name;
       }
     }
 
     if (realTableName == null) {
-      throw new IllegalStateException("no table was found that matches " + tableName);
+      throw new JdbcConnectorException("no table was found that matches " + tableName);
     }
     return realTableName;
   }
@@ -74,12 +74,12 @@ class TableKeyColumnManager {
       throws SQLException {
     ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, tableName);
     if (!primaryKeys.next()) {
-      throw new IllegalStateException(
+      throw new JdbcConnectorException(
           "The table " + tableName + " does not have a primary key column.");
     }
     String key = primaryKeys.getString("COLUMN_NAME");
     if (primaryKeys.next()) {
-      throw new IllegalStateException(
+      throw new JdbcConnectorException(
           "The table " + tableName + " has more than one primary key column.");
     }
     return key;
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java
new file mode 100644
index 0000000..fdd2b10
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.connectors.jdbc;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.geode.internal.PdxSerializerObject;
+
+public class ClassWithSupportedPdxFields implements PdxSerializerObject, Serializable {
+  private boolean aboolean;
+  private byte abyte;
+  private short ashort;
+  private int anint;
+  private long along;
+  private float afloat;
+  private double adouble;
+  private String astring;
+  private Date adate;
+  private Object anobject;
+  private byte[] abytearray;
+  private char achar;
+
+  public ClassWithSupportedPdxFields() {}
+
+  public ClassWithSupportedPdxFields(boolean aboolean, byte abyte, short ashort, int anint,
+      long along, float afloat, double adouble, String astring, Date adate, Object anobject,
+      byte[] abytearray, char achar) {
+    this.aboolean = aboolean;
+    this.abyte = abyte;
+    this.ashort = ashort;
+    this.anint = anint;
+    this.along = along;
+    this.afloat = afloat;
+    this.adouble = adouble;
+    this.astring = astring;
+    this.adate = adate;
+    this.anobject = anobject;
+    this.abytearray = abytearray;
+    this.achar = achar;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ClassWithSupportedPdxFields that = (ClassWithSupportedPdxFields) o;
+
+    if (isAboolean() != that.isAboolean()) {
+      return false;
+    }
+    if (getAbyte() != that.getAbyte()) {
+      return false;
+    }
+    if (getAshort() != that.getAshort()) {
+      return false;
+    }
+    if (getAnint() != that.getAnint()) {
+      return false;
+    }
+    if (getAlong() != that.getAlong()) {
+      return false;
+    }
+    if (Float.compare(that.getAfloat(), getAfloat()) != 0) {
+      return false;
+    }
+    if (Double.compare(that.getAdouble(), getAdouble()) != 0) {
+      return false;
+    }
+    if (getAstring() != null ? !getAstring().equals(that.getAstring())
+        : that.getAstring() != null) {
+      return false;
+    }
+    if (getAdate() != null ? !getAdate().equals(that.getAdate()) : that.getAdate() != null) {
+      return false;
+    }
+    if (getAnobject() != null ? !getAnobject().equals(that.getAnobject())
+        : that.getAnobject() != null) {
+      return false;
+    }
+    if (getAchar() != that.getAchar()) {
+      return false;
+    }
+    return Arrays.equals(getAbytearray(), that.getAbytearray());
+  }
+
+  @Override
+  public int hashCode() {
+    int result;
+    long temp;
+    result = (isAboolean() ? 1 : 0);
+    result = 31 * result + (int) getAbyte();
+    result = 31 * result + (int) getAshort();
+    result = 31 * result + getAnint();
+    result = 31 * result + (int) (getAlong() ^ (getAlong() >>> 32));
+    result = 31 * result + (getAfloat() != +0.0f ? Float.floatToIntBits(getAfloat()) : 0);
+    temp = Double.doubleToLongBits(getAdouble());
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    result = 31 * result + (getAstring() != null ? getAstring().hashCode() : 0);
+    result = 31 * result + (getAdate() != null ? getAdate().hashCode() : 0);
+    result = 31 * result + (getAnobject() != null ? getAnobject().hashCode() : 0);
+    result = 31 * result + Arrays.hashCode(getAbytearray());
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "ClassWithSupportedPdxFields{" + "aboolean=" + isAboolean() + ", abyte=" + getAbyte()
+        + ", achar=" + getAchar() + ", ashort=" + getAshort() + ", anint=" + getAnint() + ", along="
+        + getAlong() + ", afloat=" + getAfloat() + ", adouble=" + getAdouble() + ", astring='"
+        + getAstring() + '\'' + ", adate=" + getAdate() + ", anobject=" + getAnobject()
+        + ", abytearray=" + Arrays.toString(getAbytearray()) + '}';
+  }
+
+  public boolean isAboolean() {
+    return aboolean;
+  }
+
+  public byte getAbyte() {
+    return abyte;
+  }
+
+  public char getAchar() {
+    return achar;
+  }
+
+  public short getAshort() {
+    return ashort;
+  }
+
+  public int getAnint() {
+    return anint;
+  }
+
+  public long getAlong() {
+    return along;
+  }
+
+  public float getAfloat() {
+    return afloat;
+  }
+
+  public double getAdouble() {
+    return adouble;
+  }
+
+  public String getAstring() {
+    return astring;
+  }
+
+  public Date getAdate() {
+    return adate;
+  }
+
+  public Object getAnobject() {
+    return anobject;
+  }
+
+  public byte[] getAbytearray() {
+    return abytearray;
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 28188d2..9da3033 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfigExistsException;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.connectors.jdbc.internal.TableKeyColumnManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
 import org.apache.geode.internal.cache.InternalCache;
@@ -104,23 +105,9 @@ public class JdbcAsyncWriterIntegrationTest {
   }
 
   @Test
-  public void canInsertIntoTable() throws Exception {
-    employees.put("1", pdxEmployee1);
-    employees.put("2", pdxEmployee2);
-
-    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
-
-    ResultSet resultSet =
-        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
-    assertRecordMatchesEmployee(resultSet, "1", employee1);
-    assertRecordMatchesEmployee(resultSet, "2", employee2);
-    assertThat(resultSet.next()).isFalse();
-  }
-
-  @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
     PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).writeInt("id", 3).create();
+        .writeObject("age", 55).writeInt("id", 3).create();
     employees.put("1", pdx1);
 
     awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
@@ -254,7 +241,7 @@ public class JdbcAsyncWriterIntegrationTest {
 
   private SqlHandler createSqlHandler()
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    return new SqlHandler(new TestableConnectionManager(),
+    return new SqlHandler(new TestableConnectionManager(), new TableKeyColumnManager(),
         TestConfigService.getTestConfigService());
   }
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcConnectorExceptionTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcConnectorExceptionTest.java
index fe138db..f48151b 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcConnectorExceptionTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcConnectorExceptionTest.java
@@ -37,11 +37,49 @@ public class JdbcConnectorExceptionTest {
   }
 
   @Test
+  public void returnsExceptionWithCauseForNonSqlExceptionAndNonSqlNestedCause() {
+    IllegalStateException cause = new IllegalStateException(new IllegalStateException());
+    Exception e = JdbcConnectorException.createException(cause);
+    assertThat(e.getCause()).isNotNull().isSameAs(cause);
+  }
+
+  @Test
+  public void returnsExceptionWithCauseForNonSqlExceptionWithMessage() {
+    Exception e = JdbcConnectorException.createException("message", new IllegalStateException());
+    assertThat(e.getMessage()).isEqualTo("message");
+    assertThat(e.getCause()).isNotNull().isInstanceOf(IllegalStateException.class);
+  }
+
+  @Test
   public void returnsExceptionWithNoCauseForSqlException() {
-    Exception sqlException = new SQLException();
+    Exception sqlException = new SQLException("mySqlExceptionMessage");
     Exception e = JdbcConnectorException.createException(sqlException);
     assertThat(e.getCause()).isNull();
-    assertThat(e.getMessage())
+    assertThat(e.getMessage()).contains("mySqlExceptionMessage")
+        .contains(this.getClass().getCanonicalName() + "." + testName.getMethodName());
+  }
+
+  @Test
+  public void returnsExceptionWithNoCauseForSqlExceptionWithMessage() {
+    Exception sqlException = new SQLException();
+    Exception e = JdbcConnectorException.createException("message", sqlException);
+    assertThat(e.getCause()).isNull();
+    assertThat(e.getMessage()).startsWith("message")
         .contains(this.getClass().getCanonicalName() + "." + testName.getMethodName());
   }
+
+  @Test
+  public void returnsExceptionWithNoCauseForNestedSqlException() {
+    Exception sqlException = new SQLException();
+    Exception e = JdbcConnectorException.createException(new IllegalStateException(sqlException));
+    assertThat(e.getMessage())
+        .contains(this.getClass().getCanonicalName() + "." + testName.getMethodName())
+        .contains("SQLException").contains("IllegalStateException");
+  }
+
+  @Test
+  public void returnsExceptionForNull() {
+    Exception e = JdbcConnectorException.createException(null);
+    assertThat(e.getCause()).isNull();
+  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
index adeb48d..4c4e16a 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
@@ -20,10 +20,14 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Date;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import org.awaitility.Awaitility;
 import org.junit.After;
@@ -33,9 +37,15 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
+import org.apache.geode.pdx.internal.AutoSerializableManager;
 import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.ClientVM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.rules.GfshCommandRule;
@@ -43,7 +53,6 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 /**
  * End-to-end dunits for jdbc connector
- *
  */
 @Category(DistributedTest.class)
 public class JdbcDUnitTest implements Serializable {
@@ -54,30 +63,78 @@ public class JdbcDUnitTest implements Serializable {
   private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
   private static final String CONNECTION_NAME = "TestConnection";
 
+
   @Rule
   public transient GfshCommandRule gfsh = new GfshCommandRule();
 
   @Rule
-  public ClusterStartupRule startupRule = new ClusterStartupRule();
+  public transient ClusterStartupRule startupRule = new ClusterStartupRule();
 
   @Rule
   public SerializableTestName testName = new SerializableTestName();
 
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
   private MemberVM server;
+  private MemberVM locator;
 
   @Before
   public void setup() throws Exception {
-    MemberVM locator = startupRule.startLocatorVM(0);
+    locator = startupRule.startLocatorVM(0);
     gfsh.connectAndVerify(locator);
-    server = startupRule.startServerVM(1, locator.getPort());
-    server.invoke(() -> createTable());
   }
 
   private void createTable() throws SQLException {
-    Connection connection = DriverManager.getConnection(CONNECTION_URL);
-    Statement statement = connection.createStatement();
-    statement.execute("Create Table " + TABLE_NAME
-        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+    server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
+    server.invoke(() -> {
+      Connection connection = DriverManager.getConnection(CONNECTION_URL);
+      Statement statement = connection.createStatement();
+      statement.execute("Create Table " + TABLE_NAME
+          + " (id varchar(10) primary key not null, name varchar(10), age int)");
+    });
+  }
+
+  private void createTableForAllSupportedFields() throws SQLException {
+    server = startupRule.startServerVM(1,
+        x -> x.withConnectionToLocator(locator.getPort()).withPDXReadSerialized());
+    server.invoke(() -> {
+      Connection connection = DriverManager.getConnection(CONNECTION_URL);
+      Statement statement = connection.createStatement();
+      statement.execute("Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
+          + "aboolean smallint, " + "abyte smallint, " + "ashort smallint, " + "anint int, "
+          + "along bigint, " + "afloat float, " + "adouble float, " + "astring varchar(10), "
+          + "adate timestamp, " + "anobject varchar(20), " + "abytearray blob(100), "
+          + "achar char(1))");
+    });
+  }
+
+  private void insertDataForAllSupportedFieldsTable(String key,
+      ClassWithSupportedPdxFields classWithSupportedPdxFields) {
+    server.invoke(() -> {
+      ClassWithSupportedPdxFields data = classWithSupportedPdxFields;
+      Connection connection = DriverManager.getConnection(CONNECTION_URL);
+
+      String insertQuery = "Insert into " + TABLE_NAME + " values (" + "?,?,?,?,?,?,?,?,?,?,?,?,?)";
+      System.out.println("### Query is :" + insertQuery);
+      PreparedStatement statement = connection.prepareStatement(insertQuery);
+      statement.setObject(1, key);
+      statement.setObject(2, data.isAboolean());
+      statement.setObject(3, data.getAbyte());
+      statement.setObject(4, data.getAshort());
+      statement.setObject(5, data.getAnint());
+      statement.setObject(6, data.getAlong());
+      statement.setObject(7, data.getAfloat());
+      statement.setObject(8, data.getAdouble());
+      statement.setObject(9, data.getAstring());
+      statement.setObject(10, data.getAdate());
+      statement.setObject(11, data.getAnobject());
+      statement.setObject(12, data.getAbytearray());
+      statement.setObject(13, new Character(data.getAchar()).toString());
+
+      statement.execute();
+    });
   }
 
   @After
@@ -105,7 +162,8 @@ public class JdbcDUnitTest implements Serializable {
 
   @Test
   public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception {
-    createRegion(true, false, false);
+    createTable();
+    createRegionUsingGfsh(true, false, false);
     createJdbcConnection();
 
     server.invoke(() -> {
@@ -114,15 +172,16 @@ public class JdbcDUnitTest implements Serializable {
               .writeString("name", "Emp1").writeInt("age", 55).create();
       Region region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       assertThatThrownBy(() -> region.put("key1", pdxEmployee1))
-          .isExactlyInstanceOf(IllegalStateException.class).hasMessage(
+          .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
               "JDBC mapping for region employees not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
     });
   }
 
   @Test
   public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws Exception {
-    IgnoredException.addIgnoredException("IllegalStateException");
-    createRegion(false, true, false);
+    createTable();
+    IgnoredException.addIgnoredException("JdbcConnectorException");
+    createRegionUsingGfsh(false, true, false);
     createJdbcConnection();
 
     server.invoke(() -> {
@@ -143,7 +202,8 @@ public class JdbcDUnitTest implements Serializable {
 
   @Test
   public void throwsExceptionWhenNoMappingMatches() throws Exception {
-    createRegion(true, false, false);
+    createTable();
+    createRegionUsingGfsh(true, false, false);
     createJdbcConnection();
     createMapping("NoSuchRegion", CONNECTION_NAME);
 
@@ -153,14 +213,15 @@ public class JdbcDUnitTest implements Serializable {
               .writeString("name", "Emp1").writeInt("age", 55).create();
       Region region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       assertThatThrownBy(() -> region.put("key1", pdxEmployee1))
-          .isExactlyInstanceOf(IllegalStateException.class).hasMessage(
+          .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
               "JDBC mapping for region employees not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
     });
   }
 
   @Test
   public void throwsExceptionWhenNoConnectionExists() throws Exception {
-    createRegion(true, false, false);
+    createTable();
+    createRegionUsingGfsh(true, false, false);
     createMapping(REGION_NAME, CONNECTION_NAME);
 
     server.invoke(() -> {
@@ -169,14 +230,15 @@ public class JdbcDUnitTest implements Serializable {
               .writeString("name", "Emp1").writeInt("age", 55).create();
       Region region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
       assertThatThrownBy(() -> region.put("key1", pdxEmployee1))
-          .isExactlyInstanceOf(IllegalStateException.class).hasMessage(
+          .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
               "JDBC connection with name TestConnection not found. Create the connection with the gfsh command 'create jdbc-connection'");
     });
   }
 
   @Test
   public void putWritesToDB() throws Exception {
-    createRegion(true, false, false);
+    createTable();
+    createRegionUsingGfsh(true, false, false);
     createJdbcConnection();
     createMapping(REGION_NAME, CONNECTION_NAME);
     server.invoke(() -> {
@@ -192,7 +254,8 @@ public class JdbcDUnitTest implements Serializable {
 
   @Test
   public void putAsyncWritesToDB() throws Exception {
-    createRegion(true, false, false);
+    createTable();
+    createRegionUsingGfsh(true, false, false);
     createJdbcConnection();
     createMapping(REGION_NAME, CONNECTION_NAME);
     server.invoke(() -> {
@@ -208,20 +271,23 @@ public class JdbcDUnitTest implements Serializable {
 
   @Test
   public void getReadsFromEmptyDB() throws Exception {
-    createRegion(false, false, true);
+    createTable();
+    createRegionUsingGfsh(false, false, true);
     createJdbcConnection();
     createMapping(REGION_NAME, CONNECTION_NAME);
     server.invoke(() -> {
       String key = "emp1";
       Region region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
-      region.get(key);
+      Object value = region.get(key);
+      assertThat(value).isNull();
       assertThat(region.size()).isEqualTo(0);
     });
   }
 
   @Test
   public void getReadsFromDB() throws Exception {
-    createRegion(true, false, true);
+    createTable();
+    createRegionUsingGfsh(true, false, true);
     createJdbcConnection();
     createMapping(REGION_NAME, CONNECTION_NAME);
     server.invoke(() -> {
@@ -235,9 +301,96 @@ public class JdbcDUnitTest implements Serializable {
       region.invalidate(key);
 
       PdxInstance result = (PdxInstance) region.get(key);
-      assertThat(result.getField("id")).isEqualTo(pdxEmployee1.getField("id"));
-      assertThat(result.getField("name")).isEqualTo(pdxEmployee1.getField("name"));
-      assertThat(result.getField("age")).isEqualTo(pdxEmployee1.getField("age"));
+      assertThat(result.getFieldNames()).hasSize(3);
+      assertThat(result.getField("id")).isEqualTo(key);
+      assertThat(result.getField("name")).isEqualTo("Emp1");
+      assertThat(result.getField("age")).isEqualTo(55);
+    });
+  }
+
+  @Test
+  public void getReadsFromDBWithPdxClassName() throws Exception {
+    createTable();
+    createRegionUsingGfsh(true, false, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME, Employee.class.getName(), false);
+    server.invoke(() -> {
+      String key = "id1";
+      Employee value = new Employee("Emp1", 55);
+      Region region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      region.put(key, value);
+      region.invalidate(key);
+
+      Employee result = (Employee) region.get(key);
+      assertThat(result.getName()).isEqualTo("Emp1");
+      assertThat(result.getAge()).isEqualTo(55);
+    });
+  }
+
+  @Test
+  public void clientGetReadsFromDBWithPdxClassName() throws Exception {
+    createTableForAllSupportedFields();
+    ClientVM client = getClientVM();
+    createClientRegion(client);
+
+    createRegionUsingGfsh(true, false, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME, ClassWithSupportedPdxFields.class.getName(), false);
+    client.invoke(() -> {
+      String key = "id1";
+      ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(true, (byte) 1, (short) 2,
+          3, 4, 5.5f, 6.0, "BigEmp", new Date(100000), "BigEmpObject", new byte[] {1, 2}, 'c');
+      Region<String, ClassWithSupportedPdxFields> region =
+          ClusterStartupRule.getClientCache().getRegion(REGION_NAME);
+      region.put(key, value);
+      region.invalidate(key);
+
+      ClassWithSupportedPdxFields result = region.get(key);
+      assertThat(result).isEqualTo(value);
+    });
+  }
+
+  @Test
+  public void clientRegistersPdxAndReadsFromDBWithPdxClassName() throws Exception {
+    createTableForAllSupportedFields();
+    ClientVM client = getClientVM();
+    createClientRegion(client);
+    createRegionUsingGfsh(true, false, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME, ClassWithSupportedPdxFields.class.getName(), false);
+    String key = "id1";
+    ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(true, (byte) 1, (short) 2,
+        3, 4, 5.5f, 6.0, "BigEmp", new Date(100000), "BigEmpObject", new byte[] {1, 2}, 'c');
+
+    server.invoke(() -> {
+      insertDataForAllSupportedFieldsTable(key, value);
+    });
+
+    client.invoke(() -> {
+      ClusterStartupRule.getClientCache().registerPdxMetaData(new ClassWithSupportedPdxFields());
+
+      Region<String, ClassWithSupportedPdxFields> region =
+          ClusterStartupRule.getClientCache().getRegion(REGION_NAME);
+
+      ClassWithSupportedPdxFields result = region.get(key);
+      assertThat(result).isEqualTo(value);
+    });
+  }
+
+  private ClientVM getClientVM() throws Exception {
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> {
+      System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true");
+      cf.addPoolLocator("localhost", locator.getPort());
+      cf.setPdxSerializer(
+          new ReflectionBasedAutoSerializer(ClassWithSupportedPdxFields.class.getName()));
+    };
+    return startupRule.startClientVM(2, new Properties(), cacheSetup);
+  }
+
+  private void createClientRegion(ClientVM client) {
+    client.invoke(() -> {
+      ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(REGION_NAME);
     });
   }
 
@@ -254,7 +407,8 @@ public class JdbcDUnitTest implements Serializable {
     gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
   }
 
-  private void createRegion(boolean withCacheWriter, boolean withAsyncWriter, boolean withLoader) {
+  private void createRegionUsingGfsh(boolean withCacheWriter, boolean withAsyncWriter,
+      boolean withLoader) {
     StringBuffer createRegionCmd = new StringBuffer();
     createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE ");
     if (withCacheWriter) {
@@ -272,8 +426,18 @@ public class JdbcDUnitTest implements Serializable {
   }
 
   private void createMapping(String regionName, String connectionName) {
+    createMapping(regionName, connectionName, null);
+  }
+
+  private void createMapping(String regionName, String connectionName, String pdxClassName) {
+    createMapping(regionName, connectionName, pdxClassName, true);
+  }
+
+  private void createMapping(String regionName, String connectionName, String pdxClassName,
+      boolean valueContainsPrimaryKey) {
     final String commandStr = "create jdbc-mapping --region=" + regionName + " --connection="
-        + connectionName + " --value-contains-primary-key";
+        + connectionName + (valueContainsPrimaryKey ? " --value-contains-primary-key" : "")
+        + (pdxClassName != null ? " --pdx-class-name=" + pdxClassName : "");
     gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
   }
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index f3fd61b..bbdb571 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -17,13 +17,18 @@ package org.apache.geode.connectors.jdbc;
 import static org.apache.geode.cache.RegionShortcut.REPLICATE;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.Statement;
+import java.util.Date;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.CacheFactory;
@@ -32,10 +37,14 @@ import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfigExistsException;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.connectors.jdbc.internal.TableKeyColumnManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
+import org.apache.geode.pdx.internal.AutoSerializableManager;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 @Category(IntegrationTest.class)
@@ -49,14 +58,19 @@ public class JdbcLoaderIntegrationTest {
   private Connection connection;
   private Statement statement;
 
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
   @Before
   public void setUp() throws Exception {
+    System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true");
     cache = (InternalCache) new CacheFactory().set("locators", "").set("mcast-port", "0")
-        .setPdxReadSerialized(false).create();
+        .setPdxReadSerialized(false)
+        .setPdxSerializer(
+            new ReflectionBasedAutoSerializer(ClassWithSupportedPdxFields.class.getName()))
+        .create();
     connection = DriverManager.getConnection(CONNECTION_URL);
     statement = connection.createStatement();
-    statement.execute("Create Table " + REGION_TABLE_NAME
-        + " (id varchar(10) primary key not null, name varchar(10), age int)");
   }
 
   @After
@@ -65,6 +79,19 @@ public class JdbcLoaderIntegrationTest {
     closeDB();
   }
 
+  private void createEmployeeTable() throws Exception {
+    statement.execute("Create Table " + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  private void createClassWithSupportedPdxFieldsTable() throws Exception {
+    statement.execute("Create Table " + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, " + "aboolean smallint, " + "abyte smallint, "
+        + "ashort smallint, " + "anint int, " + "along bigint, " + "afloat float, "
+        + "adouble float, " + "astring varchar(10), " + "adate timestamp, "
+        + "anobject varchar(20), " + "abytearray blob(100), " + "achar char(1))");
+  }
+
   private void closeDB() throws Exception {
     if (statement == null) {
       statement = connection.createStatement();
@@ -79,32 +106,122 @@ public class JdbcLoaderIntegrationTest {
 
   @Test
   public void verifySimpleGet() throws Exception {
-    statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'Emp1', 21)");
-    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME);
+    createEmployeeTable();
+    statement
+        .execute("Insert into " + REGION_TABLE_NAME + " (id, name, age) values('1', 'Emp1', 21)");
+    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME, null, false);
+    PdxInstance pdx = region.get("1");
+
+    assertThat(pdx.getFieldNames()).hasSize(2);
+    assertThat(pdx.getField("name")).isEqualTo("Emp1");
+    assertThat(pdx.getField("age")).isEqualTo(21);
+  }
+
+  @Test
+  public void verifySimpleGetWithPrimaryKeyInValue() throws Exception {
+    createEmployeeTable();
+    statement
+        .execute("Insert into " + REGION_TABLE_NAME + " (id, name, age) values('1', 'Emp1', 21)");
+    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME, null, true);
     PdxInstance pdx = region.get("1");
 
+    assertThat(pdx.getFieldNames()).hasSize(3);
+    assertThat(pdx.getField("id")).isEqualTo("1");
     assertThat(pdx.getField("name")).isEqualTo("Emp1");
     assertThat(pdx.getField("age")).isEqualTo(21);
   }
 
   @Test
+  public void verifyGetWithPdxClassName() throws Exception {
+    createEmployeeTable();
+    statement
+        .execute("Insert into " + REGION_TABLE_NAME + "(id, name, age) values('1', 'Emp1', 21)");
+    Region<String, Employee> region =
+        createRegionWithJDBCLoader(REGION_TABLE_NAME, Employee.class.getName(), false);
+    createPdxType();
+
+    Employee value = region.get("1");
+
+    assertThat(value.getName()).isEqualTo("Emp1");
+    assertThat(value.getAge()).isEqualTo(21);
+  }
+
+  @Test
+  public void verifyGetWithSupportedFieldsWithPdxClassName() throws Exception {
+    createClassWithSupportedPdxFieldsTable();
+    ClassWithSupportedPdxFields classWithSupportedPdxFields =
+        createClassWithSupportedPdxFieldsForInsert();
+    insertIntoClassWithSupportedPdxFieldsTable("1", classWithSupportedPdxFields);
+    Region<String, ClassWithSupportedPdxFields> region = createRegionWithJDBCLoader(
+        REGION_TABLE_NAME, ClassWithSupportedPdxFields.class.getName(), false);
+
+    createPdxType(classWithSupportedPdxFields);
+
+    ClassWithSupportedPdxFields value = region.get("1");
+    assertThat(value).isEqualTo(classWithSupportedPdxFields);
+  }
+
+  private void createPdxType() throws IOException {
+    createPdxType(new Employee("name", 45));
+  }
+
+  private void createPdxType(Object value) throws IOException {
+    // the following serialization will add a pdxType
+    BlobHelper.serializeToBlob(value);
+  }
+
+  @Test
   public void verifySimpleMiss() throws Exception {
-    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME);
+    createEmployeeTable();
+    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME, null, false);
     PdxInstance pdx = region.get("1");
     assertThat(pdx).isNull();
   }
 
-  private SqlHandler createSqlHandler()
+  private SqlHandler createSqlHandler(String pdxClassName, boolean primaryKeyInValue)
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    return new SqlHandler(new TestableConnectionManager(),
-        TestConfigService.getTestConfigService());
+    return new SqlHandler(new TestableConnectionManager(), new TableKeyColumnManager(),
+        TestConfigService.getTestConfigService((InternalCache) cache, pdxClassName,
+            primaryKeyInValue));
   }
 
-  private Region<String, PdxInstance> createRegionWithJDBCLoader(String regionName)
+  private <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName,
+      boolean primaryKeyInValue)
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    JdbcLoader<String, PdxInstance> jdbcLoader = new JdbcLoader<>(createSqlHandler(), cache);
-    RegionFactory<String, PdxInstance> regionFactory = cache.createRegionFactory(REPLICATE);
+    JdbcLoader<K, V> jdbcLoader =
+        new JdbcLoader<>(createSqlHandler(pdxClassName, primaryKeyInValue), cache);
+    RegionFactory<K, V> regionFactory = cache.createRegionFactory(REPLICATE);
     regionFactory.setCacheLoader(jdbcLoader);
     return regionFactory.create(regionName);
   }
+
+  private ClassWithSupportedPdxFields createClassWithSupportedPdxFieldsForInsert() {
+    ClassWithSupportedPdxFields classWithSupportedPdxFields =
+        new ClassWithSupportedPdxFields(true, (byte) 1, (short) 2, 3, 4, 5.5f, 6.0, "BigEmp",
+            new Date(100000), "BigEmpObject", new byte[] {1, 2}, 'c');
+
+    return classWithSupportedPdxFields;
+  }
+
+  private void insertIntoClassWithSupportedPdxFieldsTable(String id,
+      ClassWithSupportedPdxFields classWithSupportedPdxFields) throws Exception {
+    String insertString =
+        "Insert into " + REGION_TABLE_NAME + " values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+    PreparedStatement ps = connection.prepareStatement(insertString);
+    int i = 1;
+    ps.setObject(i++, id);
+    ps.setObject(i++, classWithSupportedPdxFields.isAboolean());
+    ps.setObject(i++, classWithSupportedPdxFields.getAbyte());
+    ps.setObject(i++, classWithSupportedPdxFields.getAshort());
+    ps.setObject(i++, classWithSupportedPdxFields.getAnint());
+    ps.setObject(i++, classWithSupportedPdxFields.getAlong());
+    ps.setObject(i++, classWithSupportedPdxFields.getAfloat());
+    ps.setObject(i++, classWithSupportedPdxFields.getAdouble());
+    ps.setObject(i++, classWithSupportedPdxFields.getAstring());
+    ps.setObject(i++, classWithSupportedPdxFields.getAdate());
+    ps.setObject(i++, classWithSupportedPdxFields.getAnobject());
+    ps.setObject(i++, classWithSupportedPdxFields.getAbytearray());
+    ps.setObject(i++, new Character(classWithSupportedPdxFields.getAchar()).toString());
+    ps.executeUpdate();
+  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
index 649496c..e4cbb16 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.connectors.jdbc.internal.ConnectionConfigExistsException;
 import org.apache.geode.connectors.jdbc.internal.RegionMappingExistsException;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.connectors.jdbc.internal.TableKeyColumnManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
 import org.apache.geode.internal.cache.InternalCache;
@@ -126,7 +127,7 @@ public class JdbcWriterIntegrationTest {
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
     PdxInstance pdxInstanceWithId = cache.createPdxInstanceFactory(Employee.class.getName())
-        .writeString("name", "Emp1").writeInt("age", 55).writeInt("id", 3).create();
+        .writeString("name", "Emp1").writeInt("age", 55).writeString("id", "3").create();
     employees.put("1", pdxInstanceWithId);
 
     ResultSet resultSet =
@@ -225,7 +226,7 @@ public class JdbcWriterIntegrationTest {
 
   private SqlHandler createSqlHandler()
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    return new SqlHandler(new TestableConnectionManager(),
+    return new SqlHandler(new TestableConnectionManager(), new TableKeyColumnManager(),
         TestConfigService.getTestConfigService());
   }
 
@@ -234,6 +235,6 @@ public class JdbcWriterIntegrationTest {
     assertThat(resultSet.next()).isTrue();
     assertThat(resultSet.getString("id")).isEqualTo(key);
     assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
-    assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
+    assertThat(resultSet.getInt("age")).isEqualTo(employee.getAge());
   }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index 2b3db92..f47be9d 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -17,50 +17,52 @@ package org.apache.geode.connectors.jdbc.internal;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import junitparams.JUnitParamsRunner;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
 
+import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.connectors.jdbc.JdbcConnectorException;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.pdx.PdxInstanceFactory;
 import org.apache.geode.pdx.internal.PdxInstanceImpl;
 import org.apache.geode.pdx.internal.PdxType;
 import org.apache.geode.test.junit.categories.UnitTest;
 
+@RunWith(JUnitParamsRunner.class)
 @Category(UnitTest.class)
 public class SqlHandlerTest {
+  private static final String CONNECTION_CONFIG_NAME = "testConnectionConfig";
   private static final String REGION_NAME = "testRegion";
   private static final String TABLE_NAME = "testTable";
-  private static final Object COLUMN_VALUE_1 = "columnValue1";
   private static final String COLUMN_NAME_1 = "columnName1";
+  private static final Object COLUMN_VALUE_1 = "columnValue1";
   private static final Object COLUMN_VALUE_2 = "columnValue2";
   private static final String COLUMN_NAME_2 = "columnName2";
   private static final String KEY_COLUMN = "keyColumn";
+  private static final String PDX_FIELD_NAME_1 = COLUMN_NAME_1.toLowerCase();
+  private static final String PDX_FIELD_NAME_2 = COLUMN_NAME_2.toLowerCase();
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
@@ -84,8 +86,10 @@ public class SqlHandlerTest {
     manager = mock(DataSourceManager.class);
     dataSource = mock(JdbcDataSource.class);
     connectionConfig = mock(ConnectionConfiguration.class);
+    when(connectionConfig.getName()).thenReturn(CONNECTION_CONFIG_NAME);
     when(connectionConfig.getUrl()).thenReturn("fake:url");
     region = mock(Region.class);
+    when(region.getName()).thenReturn(REGION_NAME);
     cache = mock(InternalCache.class);
     connection = mock(Connection.class);
     when(region.getRegionService()).thenReturn(cache);
@@ -97,13 +101,14 @@ public class SqlHandlerTest {
     value = mock(PdxInstanceImpl.class);
     when(value.getPdxType()).thenReturn(mock(PdxType.class));
 
-    when(connectorService.getConnectionConfig(any())).thenReturn(connectionConfig);
+    when(connectorService.getConnectionConfig(CONNECTION_CONFIG_NAME)).thenReturn(connectionConfig);
 
     regionMapping = mock(RegionMapping.class);
+    when(regionMapping.getConnectionConfigName()).thenReturn(CONNECTION_CONFIG_NAME);
     when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
     when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
     when(regionMapping.getRegionToTableName()).thenReturn(TABLE_NAME);
-    when(connectorService.getMappingForRegion(any())).thenReturn(regionMapping);
+    when(connectorService.getMappingForRegion(REGION_NAME)).thenReturn(regionMapping);
 
 
     when(manager.getDataSource(any())).thenReturn(this.dataSource);
@@ -114,20 +119,35 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void readReturnsNullIfNoKeyProvided() throws Exception {
+  public void verifyCloseCallsManagerClose() {
+    handler.close();
+
+    verify(manager).close();
+  }
+
+  @Test
+  public void readThrowsIfNoKeyProvided() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     handler.read(region, null);
   }
 
   @Test
-  public void usesPdxFactoryForClassWhenExists() throws Exception {
-    setupEmptyResultSet();
-    String pdxClassName = "classname";
-    when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
-    handler.read(region, new Object());
+  public void readThrowsIfNoMapping() throws Exception {
+    thrown.expect(JdbcConnectorException.class);
+    handler.read(mock(Region.class), new Object());
+  }
 
-    verify(cache).createPdxInstanceFactory(pdxClassName);
-    verifyNoMoreInteractions(cache);
+  @Test
+  public void readThrowsIfNoConnectionConfig() throws Exception {
+    Region region2 = mock(Region.class);
+    when(region2.getName()).thenReturn("region2");
+    RegionMapping regionMapping2 = mock(RegionMapping.class);
+    when(regionMapping2.getConnectionConfigName()).thenReturn("bogus connection name");
+    when(regionMapping2.getRegionName()).thenReturn("region2");
+    when(connectorService.getMappingForRegion("region2")).thenReturn(regionMapping2);
+
+    thrown.expect(JdbcConnectorException.class);
+    handler.read(region2, new Object());
   }
 
   @Test
@@ -141,21 +161,6 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void usesPbxFactoryForNoPbxClassWhenClassNonExistent() throws Exception {
-    setupEmptyResultSet();
-    handler.read(region, new Object());
-
-    verify(cache).createPdxInstanceFactory("no class", false);
-    verifyNoMoreInteractions(cache);
-  }
-
-  @Test
-  public void readReturnsNullIfNoResultsReturned() throws Exception {
-    setupEmptyResultSet();
-    assertThat(handler.read(region, new Object())).isNull();
-  }
-
-  @Test
   public void throwsExceptionIfQueryFails() throws Exception {
     when(statement.executeQuery()).thenThrow(SQLException.class);
 
@@ -164,63 +169,43 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void readReturnsDataFromAllResultColumns() throws Exception {
-    ResultSet result = mock(ResultSet.class);
-    setupResultSet(result);
-    when(result.next()).thenReturn(true).thenReturn(false);
-    when(statement.executeQuery()).thenReturn(result);
-
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
-
-    String fieldName1 = COLUMN_NAME_1.toLowerCase();
-    String fieldName2 = COLUMN_NAME_2.toLowerCase();
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(fieldName1);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(fieldName2);
-    handler.read(region, new Object());
-    verify(factory).writeField(fieldName1, COLUMN_VALUE_1, Object.class);
-    verify(factory).writeField(fieldName2, COLUMN_VALUE_2, Object.class);
-    verify(factory).create();
+  public void writeThrowsExceptionIfValueIsNullAndNotDoingDestroy() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    handler.write(region, Operation.UPDATE, new Object(), null);
   }
 
   @Test
-  public void readResultOmitsKeyColumnIfNotInValue() throws Exception {
-    ResultSet result = mock(ResultSet.class);
-    setupResultSet(result);
-    when(result.next()).thenReturn(true).thenReturn(false);
-    when(statement.executeQuery()).thenReturn(result);
-    when(tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME)).thenReturn(COLUMN_NAME_1);
+  public void writeWithCharField() throws Exception {
+    String fieldName = "fieldName";
+    Object fieldValue = 'S';
+    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
+    when(value.getField(fieldName)).thenReturn(fieldValue);
 
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+    when(statement.executeUpdate()).thenReturn(1);
+    Object createKey = "createKey";
+    handler.write(region, Operation.CREATE, createKey, value);
 
-    String fieldName2 = COLUMN_NAME_2.toLowerCase();
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(fieldName2);
-    handler.read(region, new Object());
-    verify(factory).writeField(fieldName2, COLUMN_VALUE_2, Object.class);
-    verify(factory, times(1)).writeField(any(), any(), any());
-    verify(factory).create();
+    verify(statement).setObject(1, fieldValue.toString());
+    verify(statement).setObject(2, createKey);
+    verify(statement).close();
   }
 
   @Test
-  public void throwsExceptionIfMoreThatOneResultReturned() throws Exception {
-    ResultSet result = mock(ResultSet.class);
-    setupResultSet(result);
-    when(result.next()).thenReturn(true);
-    when(result.getStatement()).thenReturn(mock(PreparedStatement.class));
-    when(statement.executeQuery()).thenReturn(result);
+  public void writeWithNonCharField() throws Exception {
+    String fieldName = "fieldName";
+    int fieldValue = 100;
+    when(regionMapping.getColumnNameForField(fieldName)).thenReturn(fieldName);
+    when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
+    when(value.getField(fieldName)).thenReturn(fieldValue);
 
-    when(cache.createPdxInstanceFactory(anyString(), anyBoolean()))
-        .thenReturn(mock(PdxInstanceFactory.class));
-
-    thrown.expect(JdbcConnectorException.class);
-    handler.read(region, new Object());
-  }
+    when(statement.executeUpdate()).thenReturn(1);
+    Object createKey = "createKey";
+    handler.write(region, Operation.CREATE, createKey, value);
 
-  @Test
-  public void writeThrowsExceptionIfValueIsNullAndNotDoingDestroy() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    handler.write(region, Operation.UPDATE, new Object(), null);
+    verify(statement).setObject(1, fieldValue);
+    verify(statement).setObject(2, createKey);
+    verify(statement).close();
   }
 
   @Test
@@ -272,6 +257,12 @@ public class SqlHandlerTest {
   }
 
   @Test
+  public void writesWithUnsupportedOperationThrows() throws Exception {
+    thrown.expect(InternalGemFireException.class);
+    handler.write(region, Operation.INVALIDATE, new Object(), value);
+  }
+
+  @Test
   public void preparedStatementClearedAfterExecution() throws Exception {
     when(statement.executeUpdate()).thenReturn(1);
     handler.write(region, Operation.CREATE, new Object(), value);
@@ -357,26 +348,6 @@ public class SqlHandlerTest {
     verify(insertStatement).close();
   }
 
-  @Test
-  public void whenStatementUpdatesMultipleRowsExceptionThrown() throws Exception {
-    when(statement.executeUpdate()).thenReturn(2);
-    thrown.expect(IllegalStateException.class);
-    handler.write(region, Operation.CREATE, new Object(), value);
-    verify(statement).close();
-  }
-
-  private void setupResultSet(ResultSet result) throws SQLException {
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(result.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(2);
-
-    when(result.getObject(1)).thenReturn(COLUMN_VALUE_1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-
-    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
-    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
-  }
-
   private void setupEmptyResultSet() throws SQLException {
     ResultSet result = mock(ResultSet.class);
     when(result.next()).thenReturn(false);
@@ -424,28 +395,6 @@ public class SqlHandlerTest {
     assertThat(columnValueList.get(0).getColumnName()).isEqualTo(KEY_COLUMN);
   }
 
-  @Test
-  public void usesMappedPdxFieldNameWhenReading() throws Exception {
-    ResultSet result = mock(ResultSet.class);
-    setupResultSet(result);
-    when(result.next()).thenReturn(true).thenReturn(false);
-    when(statement.executeQuery()).thenReturn(result);
-
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
-
-    List<ColumnValue> columnList = new ArrayList<>();
-
-    String fieldName1 = "pdxFieldName1";
-    String fieldName2 = "pdxFieldName2";
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(fieldName1);
-    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(fieldName2);
-    handler.executeReadStatement(statement, columnList, factory, regionMapping, "keyColumn");
-    verify(factory).writeField(fieldName1, COLUMN_VALUE_1, Object.class);
-    verify(factory).writeField(fieldName2, COLUMN_VALUE_2, Object.class);
-    verify(factory).create();
-  }
-
   private ResultSet getPrimaryKeysMetaData() throws SQLException {
     DatabaseMetaData metadata = mock(DatabaseMetaData.class);
     ResultSet resultSet = mock(ResultSet.class);
@@ -469,5 +418,4 @@ public class SqlHandlerTest {
         .isInstanceOf(SQLException.class).hasMessage("test exception");
   }
 
-
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
new file mode 100644
index 0000000..d71a11d
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Date;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.FieldType;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxField;
+import org.apache.geode.pdx.internal.PdxType;
+import org.apache.geode.pdx.internal.TypeRegistry;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@RunWith(JUnitParamsRunner.class)
+@Category(UnitTest.class)
+public class SqlToPdxInstanceCreatorTest {
+
+  private static final String COLUMN_NAME_1 = "columnName1";
+  private static final Object COLUMN_VALUE_1 = "columnValue1";
+  private static final Object COLUMN_VALUE_2 = "columnValue2";
+  private static final String COLUMN_NAME_2 = "columnName2";
+  private static final String KEY_COLUMN = "keyColumn";
+  private static final String PDX_FIELD_NAME_1 = COLUMN_NAME_1.toLowerCase();
+  private static final String PDX_FIELD_NAME_2 = COLUMN_NAME_2.toLowerCase();
+
+  private InternalCache cache;
+  private SqlToPdxInstanceCreator sqlToPdxInstanceCreator;
+  private RegionMapping regionMapping;
+  private ResultSet resultSet;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setup() throws Exception {
+    cache = mock(InternalCache.class);
+    regionMapping = mock(RegionMapping.class);
+    resultSet = mock(ResultSet.class);
+    sqlToPdxInstanceCreator =
+        new SqlToPdxInstanceCreator(cache, regionMapping, resultSet, KEY_COLUMN);
+  }
+
+  @Test
+  public void usesPdxFactoryForClassWhenExists() throws Exception {
+    String pdxClassName = "classname";
+    when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
+    when(resultSet.next()).thenReturn(false);
+
+    sqlToPdxInstanceCreator.create();
+
+    verify(cache).createPdxInstanceFactory(pdxClassName);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void usesPdxFactoryForNoPdxClassWhenClassNonExistent() throws Exception {
+    when(resultSet.next()).thenReturn(false);
+
+    sqlToPdxInstanceCreator.create();
+
+    verify(cache).createPdxInstanceFactory("no class", false);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void readReturnsNullIfNoResultsReturned() throws Exception {
+    when(resultSet.next()).thenReturn(false);
+    PdxInstance pdxInstance = sqlToPdxInstanceCreator.create();
+    assertThat(pdxInstance).isNull();
+  }
+
+  @Test
+  public void readResultOmitsKeyColumnIfNotInValue() throws Exception {
+    setupResultSetForTwoObjectColumns(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(PDX_FIELD_NAME_2);
+    sqlToPdxInstanceCreator =
+        new SqlToPdxInstanceCreator(cache, regionMapping, resultSet, COLUMN_NAME_1);
+
+    sqlToPdxInstanceCreator.create();
+
+    verify(factory).writeObject(PDX_FIELD_NAME_2, COLUMN_VALUE_2);
+    verify(factory, times(1)).writeObject(any(), any());
+    verify(factory).create();
+  }
+
+  @Test
+  public void readReturnsDataFromAllResultColumns() throws Exception {
+    setupResultSetForTwoObjectColumns(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(PDX_FIELD_NAME_2);
+
+    sqlToPdxInstanceCreator.create();
+
+    verify(factory).writeObject(PDX_FIELD_NAME_1, COLUMN_VALUE_1);
+    verify(factory).writeObject(PDX_FIELD_NAME_2, COLUMN_VALUE_2);
+    verify(factory).create();
+  }
+
+  @Test
+  public void usesMappedPdxFieldNameWhenReading() throws Exception {
+    setupResultSetForTwoObjectColumns(resultSet);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+    String fieldName1 = "pdxFieldName1";
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(fieldName1);
+
+    sqlToPdxInstanceCreator.create();
+
+    verify(factory).writeObject(fieldName1, COLUMN_VALUE_1);
+    verify(factory).create();
+  }
+
+  @Test
+  @Parameters(source = FieldType.class)
+  public void readWritesFieldGivenPdxFieldType(FieldType fieldType) throws Exception {
+    setupResultSet(resultSet, fieldType);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+
+    sqlToPdxInstanceCreator.create();
+
+    verifyPdxFactoryWrite(factory, fieldType);
+    verify(factory).create();
+  }
+
+  @Test
+  @Parameters(source = FieldType.class)
+  public void readOfNullWritesFieldGivenPdxFieldType(FieldType fieldType) throws Exception {
+    setupResultSet(resultSet, fieldType, null);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+
+    sqlToPdxInstanceCreator.create();
+
+    verifyPdxFactoryWrite(factory, fieldType, null);
+    verify(factory).create();
+  }
+
+  @Test
+  public void readOfCharFieldWithEmptyStringWritesCharZero() throws Exception {
+    FieldType fieldType = FieldType.CHAR;
+    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+    when(resultSet.getMetaData()).thenReturn(metaData);
+    when(metaData.getColumnCount()).thenReturn(1);
+    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
+    when(resultSet.getString(1)).thenReturn("");
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+
+    sqlToPdxInstanceCreator.create();
+
+    char expectedValue = 0;
+    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
+    verify(factory).create();
+  }
+
+  @Test
+  @Parameters({"BOOLEAN_ARRAY", "OBJECT_ARRAY", "CHAR_ARRAY", "SHORT_ARRAY", "INT_ARRAY",
+      "LONG_ARRAY", "FLOAT_ARRAY", "DOUBLE_ARRAY", "STRING_ARRAY", "ARRAY_OF_BYTE_ARRAYS"})
+  public void throwsExceptionWhenReadWritesUnsupportedType(FieldType fieldType) throws Exception {
+    String returnValue = "ReturnValue";
+    setupPdxInstanceFactory(fieldType);
+    setupResultSetForObject(resultSet, returnValue);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_2)).thenReturn(PDX_FIELD_NAME_2);
+
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage("Could not convert ");
+    sqlToPdxInstanceCreator.create();
+  }
+
+  @Test
+  public void throwsExceptionIfMoreThanOneResultReturned() throws Exception {
+    setupResultSetForTwoObjectColumns(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getStatement()).thenReturn(mock(PreparedStatement.class));
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean()))
+        .thenReturn(mock(PdxInstanceFactory.class));
+
+    thrown.expect(JdbcConnectorException.class);
+    sqlToPdxInstanceCreator.create();
+  }
+
+  @Test
+  public void readThrowsGivenPdxTypeWithFieldMissing() throws Exception {
+    setupResultSet(resultSet, FieldType.OBJECT);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    String pdxClassName = "myPdxClassName";
+    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
+    TypeRegistry pdxTypeRegistry = mock(TypeRegistry.class);
+    when(cache.getPdxRegistry()).thenReturn(pdxTypeRegistry);
+    PdxType pdxType = mock(PdxType.class);
+    when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
+    when(pdxTypeRegistry.getPdxTypeForField(PDX_FIELD_NAME_1, pdxClassName)).thenReturn(pdxType);
+    when(pdxType.getPdxField(PDX_FIELD_NAME_1)).thenReturn(null);
+    when(regionMapping.getFieldNameForColumn(COLUMN_NAME_1)).thenReturn(PDX_FIELD_NAME_1);
+
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage("Could not find PdxType");
+    sqlToPdxInstanceCreator.create();
+  }
+
+  private void setupResultSetForTwoObjectColumns(ResultSet result) throws SQLException {
+    setupResultSet(result, FieldType.OBJECT);
+    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+    when(result.getMetaData()).thenReturn(metaData);
+    when(metaData.getColumnCount()).thenReturn(2);
+    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
+    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
+    when(result.getObject(1)).thenReturn(COLUMN_VALUE_1);
+    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
+  }
+
+  private void setupResultSet(ResultSet result, FieldType fieldType) throws SQLException {
+    setupResultSet(result, fieldType, getValueByFieldType(fieldType));
+  }
+
+  private void setupResultSet(ResultSet result, FieldType fieldType, Object value)
+      throws SQLException {
+    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+    when(result.getMetaData()).thenReturn(metaData);
+    when(metaData.getColumnCount()).thenReturn(1);
+    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
+
+    switch (fieldType) {
+      case STRING:
+        when(result.getString(1)).thenReturn((String) value);
+        break;
+      case CHAR:
+        Character charValue = (Character) value;
+        when(result.getString(1)).thenReturn(value == null ? null : charValue.toString());
+        break;
+      case SHORT:
+        when(result.getShort(1)).thenReturn(value == null ? 0 : (Short) value);
+        break;
+      case INT:
+        when(result.getInt(1)).thenReturn(value == null ? 0 : (Integer) value);
+        break;
+      case LONG:
+        when(result.getLong(1)).thenReturn(value == null ? 0 : (Long) value);
+        break;
+      case FLOAT:
+        when(result.getFloat(1)).thenReturn(value == null ? 0 : (Float) value);
+        break;
+      case DOUBLE:
+        when(result.getDouble(1)).thenReturn(value == null ? 0 : (Double) value);
+        break;
+      case BYTE:
+        when(result.getByte(1)).thenReturn(value == null ? 0 : (Byte) value);
+        break;
+      case BOOLEAN:
+        when(result.getBoolean(1)).thenReturn(value == null ? false : (Boolean) value);
+        break;
+      case DATE:
+        Date date = (Date) value;
+        java.sql.Timestamp sqlTimeStamp = null;
+        if (date != null) {
+          sqlTimeStamp = new java.sql.Timestamp(date.getTime());
+        }
+        when(result.getTimestamp(1)).thenReturn(sqlTimeStamp);
+        break;
+      case BYTE_ARRAY:
+        when(result.getBytes(1)).thenReturn((byte[]) value);
+        break;
+      case BOOLEAN_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case CHAR_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case SHORT_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case INT_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case LONG_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case FLOAT_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case DOUBLE_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case STRING_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case OBJECT_ARRAY:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case ARRAY_OF_BYTE_ARRAYS:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      case OBJECT:
+        when(result.getObject(1)).thenReturn(value);
+        break;
+      default:
+        throw new IllegalStateException("unhandled fieldType " + fieldType);
+    }
+
+  }
+
+  private static byte[][] arrayOfByteArray = new byte[][] {{1, 2}, {3, 4}};
+
+  private <T> T getValueByFieldType(FieldType fieldType) {
+    switch (fieldType) {
+      case STRING:
+        return (T) "stringValue";
+      case CHAR:
+        return (T) Character.valueOf('A');
+      case SHORT:
+        return (T) Short.valueOf((short) 36);
+      case INT:
+        return (T) Integer.valueOf(36);
+      case LONG:
+        return (T) Long.valueOf(36);
+      case FLOAT:
+        return (T) Float.valueOf(36);
+      case DOUBLE:
+        return (T) Double.valueOf(36);
+      case BYTE:
+        return (T) Byte.valueOf((byte) 36);
+      case BOOLEAN:
+        return (T) Boolean.TRUE;
+      case DATE:
+        return (T) new Date(1000);
+      case BYTE_ARRAY:
+        return (T) new byte[] {1, 2};
+      case BOOLEAN_ARRAY:
+        return (T) new boolean[] {true, false};
+      case CHAR_ARRAY:
+        return (T) new char[] {1, 2};
+      case SHORT_ARRAY:
+        return (T) new short[] {1, 2};
+      case INT_ARRAY:
+        return (T) new int[] {1, 2};
+      case LONG_ARRAY:
+        return (T) new long[] {1, 2};
+      case FLOAT_ARRAY:
+        return (T) new float[] {1, 2};
+      case DOUBLE_ARRAY:
+        return (T) new double[] {1, 2};
+      case STRING_ARRAY:
+        return (T) new String[] {"1", "2"};
+      case OBJECT_ARRAY:
+        return (T) new Object[] {1, 2};
+      case ARRAY_OF_BYTE_ARRAYS:
+        return (T) arrayOfByteArray;
+      case OBJECT:
+        return (T) "objectValue";
+      default:
+        throw new IllegalStateException("unhandled fieldType " + fieldType);
+    }
+  }
+
+  private PdxInstanceFactory setupPdxInstanceFactory(FieldType fieldType) {
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    String pdxClassName = "myPdxClassName";
+    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
+
+    TypeRegistry pdxTypeRegistry = mock(TypeRegistry.class);
+    when(cache.getPdxRegistry()).thenReturn(pdxTypeRegistry);
+    PdxType pdxType = mock(PdxType.class);
+
+    when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
+    when(pdxTypeRegistry.getPdxTypeForField(PDX_FIELD_NAME_1, pdxClassName)).thenReturn(pdxType);
+    PdxField pdxField = mock(PdxField.class);
+    when(pdxType.getPdxField(PDX_FIELD_NAME_1)).thenReturn(pdxField);
+    when(pdxField.getFieldType()).thenReturn(fieldType);
+
+    return factory;
+  }
+
+  private void verifyPdxFactoryWrite(PdxInstanceFactory factory, FieldType fieldType) {
+    verifyPdxFactoryWrite(factory, fieldType, getValueByFieldType(fieldType));
+  }
+
+  private void verifyPdxFactoryWrite(PdxInstanceFactory factory, FieldType fieldType,
+      Object value) {
+    switch (fieldType) {
+      case STRING:
+        verify(factory).writeString(PDX_FIELD_NAME_1, (String) value);
+        break;
+      case CHAR:
+        verify(factory).writeChar(PDX_FIELD_NAME_1, value == null ? 0 : (char) value);
+        break;
+      case SHORT:
+        verify(factory).writeShort(PDX_FIELD_NAME_1, value == null ? 0 : (short) value);
+        break;
+      case INT:
+        verify(factory).writeInt(PDX_FIELD_NAME_1, value == null ? 0 : (int) value);
+        break;
+      case LONG:
+        verify(factory).writeLong(PDX_FIELD_NAME_1, value == null ? 0 : (long) value);
+        break;
+      case FLOAT:
+        verify(factory).writeFloat(PDX_FIELD_NAME_1, value == null ? 0 : (float) value);
+        break;
+      case DOUBLE:
+        verify(factory).writeDouble(PDX_FIELD_NAME_1, value == null ? 0 : (double) value);
+        break;
+      case BYTE:
+        verify(factory).writeByte(PDX_FIELD_NAME_1, value == null ? 0 : (byte) value);
+        break;
+      case BOOLEAN:
+        verify(factory).writeBoolean(PDX_FIELD_NAME_1, value == null ? false : (boolean) value);
+        break;
+      case DATE:
+        verify(factory).writeDate(PDX_FIELD_NAME_1, (Date) value);
+        break;
+      case BYTE_ARRAY:
+        verify(factory).writeByteArray(PDX_FIELD_NAME_1, (byte[]) value);
+        break;
+      case BOOLEAN_ARRAY:
+        verify(factory).writeBooleanArray(PDX_FIELD_NAME_1, (boolean[]) value);
+        break;
+      case CHAR_ARRAY:
+        verify(factory).writeCharArray(PDX_FIELD_NAME_1, (char[]) value);
+        break;
+      case SHORT_ARRAY:
+        verify(factory).writeShortArray(PDX_FIELD_NAME_1, (short[]) value);
+        break;
+      case INT_ARRAY:
+        verify(factory).writeIntArray(PDX_FIELD_NAME_1, (int[]) value);
+        break;
+      case LONG_ARRAY:
+        verify(factory).writeLongArray(PDX_FIELD_NAME_1, (long[]) value);
+        break;
+      case FLOAT_ARRAY:
+        verify(factory).writeFloatArray(PDX_FIELD_NAME_1, (float[]) value);
+        break;
+      case DOUBLE_ARRAY:
+        verify(factory).writeDoubleArray(PDX_FIELD_NAME_1, (double[]) value);
+        break;
+      case STRING_ARRAY:
+        verify(factory).writeStringArray(PDX_FIELD_NAME_1, (String[]) value);
+        break;
+      case OBJECT_ARRAY:
+        verify(factory).writeObjectArray(PDX_FIELD_NAME_1, (Object[]) value);
+        break;
+      case ARRAY_OF_BYTE_ARRAYS:
+        verify(factory).writeArrayOfByteArrays(PDX_FIELD_NAME_1, (byte[][]) value);
+        break;
+      case OBJECT:
+        verify(factory).writeObject(PDX_FIELD_NAME_1, value);
+        break;
+      default:
+        throw new IllegalStateException("unhandled fieldType " + fieldType);
+    }
+  }
+
+  private void setupResultSetForObject(ResultSet result, Object objectToReturn)
+      throws SQLException {
+    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+    when(result.getMetaData()).thenReturn(metaData);
+    when(metaData.getColumnCount()).thenReturn(2);
+    when(result.getObject(1)).thenReturn(objectToReturn);
+    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
+
+    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
+    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManagerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManagerTest.java
index 3934b3a..bd56df6 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManagerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableKeyColumnManagerTest.java
@@ -88,7 +88,7 @@ public class TableKeyColumnManagerTest {
     when(resultSet.getString("TABLE_NAME")).thenReturn("otherTable");
 
     assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(IllegalStateException.class);
+        .isInstanceOf(JdbcConnectorException.class);
   }
 
   @Test
@@ -97,7 +97,7 @@ public class TableKeyColumnManagerTest {
     when(primaryKeys.next()).thenReturn(true);
 
     assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(IllegalStateException.class);
+        .isInstanceOf(JdbcConnectorException.class);
   }
 
   @Test
@@ -111,7 +111,7 @@ public class TableKeyColumnManagerTest {
     when(resultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase());
 
     assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(IllegalStateException.class)
+        .isInstanceOf(JdbcConnectorException.class)
         .hasMessage("Duplicate tables that match region name");
   }
 
@@ -121,7 +121,7 @@ public class TableKeyColumnManagerTest {
     when(primaryKeys.next()).thenReturn(false);
 
     assertThatThrownBy(() -> tableKeyColumnManager.getKeyColumnName(connection, TABLE_NAME))
-        .isInstanceOf(IllegalStateException.class);
+        .isInstanceOf(JdbcConnectorException.class);
   }
 
   private ResultSet getPrimaryKeysMetaData() throws SQLException {
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
index ec0e64e..93c3f27 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
@@ -34,19 +34,29 @@ public class TestConfigService {
 
   public static JdbcConnectorServiceImpl getTestConfigService()
       throws ConnectionConfigExistsException, RegionMappingExistsException {
-    InternalCache cache = mock(InternalCache.class);
-    when(cache.getExtensionPoint()).thenReturn(mock(ExtensionPoint.class));
+    return getTestConfigService(createMockCache(), null, false);
+  }
+
+  public static JdbcConnectorServiceImpl getTestConfigService(InternalCache cache,
+      String pdxClassName, boolean primaryKeyInValue)
+      throws ConnectionConfigExistsException, RegionMappingExistsException {
 
     JdbcConnectorServiceImpl service = new JdbcConnectorServiceImpl();
     service.init(cache);
     service.createConnectionConfig(createConnectionConfig());
-    service.createRegionMapping(createRegionMapping());
+    service.createRegionMapping(createRegionMapping(pdxClassName, primaryKeyInValue));
     return service;
   }
 
-  private static RegionMapping createRegionMapping() {
-    return new RegionMapping(REGION_NAME, null, REGION_TABLE_NAME, CONNECTION_CONFIG_NAME, false,
-        Collections.emptyMap());
+  private static InternalCache createMockCache() {
+    InternalCache cache = mock(InternalCache.class);
+    when(cache.getExtensionPoint()).thenReturn(mock(ExtensionPoint.class));
+    return cache;
+  }
+
+  private static RegionMapping createRegionMapping(String pdxClassName, boolean primaryKeyInValue) {
+    return new RegionMapping(REGION_NAME, pdxClassName, REGION_TABLE_NAME, CONNECTION_CONFIG_NAME,
+        primaryKeyInValue, Collections.emptyMap());
   }
 
   private static ConnectionConfiguration createConnectionConfig() {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java b/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java
index b76a7ab..e120432 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/GemFireCache.java
@@ -215,6 +215,23 @@ public interface GemFireCache extends RegionService {
   boolean getPdxIgnoreUnreadFields();
 
   /**
+   * Registers PDX meta-data given an instance of a domain class that will be serialized
+   * with PDX.
+   * <p>
+   * Note that if the instance is not of a class that will be serialized with PDX
+   * then no meta-data is registered.
+   * <p>
+   * Note that in most cases this method never needs to be called. Currently it is only
+   * needed by the JdbcLoader when gets are done for JDBC data that was not written to the
+   * table using geode.
+   *
+   * @param instance the instance of the domain class for which meta-data is to be registered
+   * @throws SerializationException if the instance can not be serialized
+   * @since Geode 1.6
+   */
+  void registerPdxMetaData(Object instance);
+
+  /**
    * Get the CacheTransactionManager instance for this Cache.
    *
    * @return The CacheTransactionManager instance.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 37b8259..5fd2b96 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -89,6 +89,7 @@ import org.apache.geode.GemFireCacheException;
 import org.apache.geode.GemFireConfigException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.LogWriter;
+import org.apache.geode.SerializationException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.admin.internal.SystemMemberCacheEventProcessor;
 import org.apache.geode.cache.AttributesFactory;
@@ -221,6 +222,7 @@ import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.security.SecurityServiceFactory;
 import org.apache.geode.internal.sequencelog.SequenceLoggerImpl;
 import org.apache.geode.internal.tcp.ConnectionTable;
+import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.internal.util.concurrent.FutureResult;
 import org.apache.geode.lang.Identifiable;
 import org.apache.geode.management.internal.JmxManagerAdvisee;
@@ -5333,4 +5335,13 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
       pdxRegistry.setPdxReadSerializedOverride(pdxReadSerialized);
     }
   }
+
+  @Override
+  public void registerPdxMetaData(Object instance) {
+    try {
+      BlobHelper.serializeToBlob(instance);
+    } catch (IOException e) {
+      throw new SerializationException("Serialization failed", e);
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 9916de5..ffc452e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -2359,4 +2359,9 @@ public class CacheCreation implements InternalCache {
   public void setPdxReadSerializedOverride(boolean pdxReadSerialized) {
     throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
   }
+
+  @Override
+  public void registerPdxMetaData(Object instance) {
+    throw new UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/AutoSerializableManager.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/AutoSerializableManager.java
index ddac809..1d19242 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/AutoSerializableManager.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/AutoSerializableManager.java
@@ -78,7 +78,7 @@ public class AutoSerializableManager {
    * not evaluate any hardcoded excludes. This helps with testing as well as possibly debugging
    * future customer issues.
    */
-  private static final String NO_HARDCODED_EXCLUDES_PARAM =
+  public static final String NO_HARDCODED_EXCLUDES_PARAM =
       DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes";
 
   private boolean noHardcodedExcludes = Boolean.getBoolean(NO_HARDCODED_EXCLUDES_PARAM);
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
index 869dc47..5537694 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/TypeRegistry.java
@@ -493,7 +493,7 @@ public class TypeRegistry {
    * @param className the PdxTypes for this class would be searched
    * @return PdxType having the field or null if not found
    */
-  PdxType getPdxTypeForField(String fieldName, String className) {
+  public PdxType getPdxTypeForField(String fieldName, String className) {
     return this.distributedTypeRegistry.getPdxTypeForField(fieldName, className);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/DataCommandFunctionWithPDXJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/DataCommandFunctionWithPDXJUnitTest.java
index e3c0bf5..b8e4e8e 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/DataCommandFunctionWithPDXJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/DataCommandFunctionWithPDXJUnitTest.java
@@ -45,7 +45,7 @@ public class DataCommandFunctionWithPDXJUnitTest {
 
   @Rule
   public ServerStarterRule server = new ServerStarterRule().withPDXPersistent()
-      .withRegion(RegionShortcut.PARTITION, PARTITIONED_REGION);
+      .withPDXReadSerialized().withRegion(RegionShortcut.PARTITION, PARTITIONED_REGION);
 
   private Customer alice;
   private Customer bob;
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java
index 979c83e..133de3a 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/AutoSerializableJUnitTest.java
@@ -27,6 +27,7 @@ import java.net.URLClassLoader;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.jmock.auto.Auto;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,8 +64,7 @@ public class AutoSerializableJUnitTest {
 
   @Before
   public void setUp() throws Exception {
-    System.setProperty(
-        DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes", "true");
+    System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true");
   }
 
   @After
@@ -1238,8 +1238,7 @@ public class AutoSerializableJUnitTest {
    */
   @Test
   public void testNoHardCodedExcludes() {
-    System.setProperty(
-        DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes", "true");
+    System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true");
     setupSerializer();
     assertFalse(manager.isExcluded("com.gemstone.gemfire.GemFireException"));
     assertFalse(manager.isExcluded("com.gemstoneplussuffix.gemfire.GemFireException"));
@@ -1257,8 +1256,7 @@ public class AutoSerializableJUnitTest {
    */
   @Test
   public void testHardCodedExcludes() {
-    System.setProperty(
-        DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes", "false");
+    System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "false");
     setupSerializer();
     assertTrue(manager.isExcluded("com.gemstone.gemfire.GemFireException"));
     assertFalse(manager.isExcluded("com.gemstoneplussuffix.gemfire.GemFireException"));
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
index 9e53131..8257915 100644
--- a/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/pdx/PdxClientServerDUnitTest.java
@@ -50,6 +50,7 @@ import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.PdxSerializerObject;
 import org.apache.geode.internal.Version;
+import org.apache.geode.pdx.internal.AutoSerializableManager;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.NetworkUtils;
@@ -185,12 +186,10 @@ public class PdxClientServerDUnitTest extends JUnit4CacheTestCase {
     VM vm1 = host.getVM(1);
     VM vm2 = host.getVM(2);
 
-    System.setProperty(
-        DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes", "true");
+    System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true");
     Invoke.invokeInEveryVM(new SerializableRunnable() {
       public void run() {
-        System.setProperty(
-            DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes", "true");
+        System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true");
       }
     });
     try {
@@ -248,13 +247,10 @@ public class PdxClientServerDUnitTest extends JUnit4CacheTestCase {
         return null;
       });
     } finally {
-      System.setProperty(
-          DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes", "false");
+      System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "false");
       Invoke.invokeInEveryVM(new SerializableRunnable() {
         public void run() {
-          System.setProperty(
-              DistributionConfig.GEMFIRE_PREFIX + "auto.serialization.no.hardcoded.excludes",
-              "false");
+          System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "false");
         }
       });
     }
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/DistributedRestoreSystemProperties.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/DistributedRestoreSystemProperties.java
index 5d6c6c4..592fe05 100755
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/DistributedRestoreSystemProperties.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/DistributedRestoreSystemProperties.java
@@ -51,12 +51,18 @@ public class DistributedRestoreSystemProperties extends RestoreSystemProperties
       public void run() {
         if (originalProperties == null) {
           originalProperties = getProperties();
-          setProperties(new Properties(originalProperties));
+          setProperties(copyOf(originalProperties));
         }
       }
     });
   }
 
+  private Properties copyOf(Properties source) {
+    Properties copy = new Properties();
+    copy.putAll(source);
+    return copy;
+  }
+
   @Override
   public void after() {
     super.after();
diff --git a/geode-core/src/test/java/org/apache/geode/test/junit/rules/ServerStarterRule.java b/geode-core/src/test/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
index 2abb297..94dcb55 100644
--- a/geode-core/src/test/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
@@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxSerializer;
 
 /**
  * This is a rule to start up a server in your current VM. It's useful for your Integration Tests.
@@ -55,6 +56,8 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl
   private transient CacheServer server;
   private int embeddedLocatorPort = -1;
   private boolean pdxPersistent = false;
+  private PdxSerializer pdxSerializer = null;
+  private boolean pdxReadSerialized = false;
 
   private Map<String, RegionShortcut> regions = new HashMap<>();
 
@@ -109,6 +112,16 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl
     return this;
   }
 
+  public ServerStarterRule withPDXReadSerialized() {
+    pdxReadSerialized = true;
+    return this;
+  }
+
+  public ServerStarterRule withPdxSerializer(PdxSerializer pdxSerializer) {
+    this.pdxSerializer = pdxSerializer;
+    return this;
+  }
+
   public ServerStarterRule withEmbeddedLocator() {
     embeddedLocatorPort = AvailablePortHelper.getRandomAvailableTCPPort();
     properties.setProperty("start-locator", "localhost[" + embeddedLocatorPort + "]");
@@ -151,8 +164,11 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl
 
   public void startServer() {
     CacheFactory cf = new CacheFactory(this.properties);
-    cf.setPdxReadSerialized(pdxPersistent);
     cf.setPdxPersistent(pdxPersistent);
+    cf.setPdxReadSerialized(pdxReadSerialized);
+    if (pdxSerializer != null) {
+      cf.setPdxSerializer(pdxSerializer);
+    }
     cache = (InternalCache) cf.create();
     DistributionConfig config =
         ((InternalDistributedSystem) cache.getDistributedSystem()).getConfig();

-- 
To stop receiving notification emails like this one, please contact
dschneider@apache.org.