You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/01/31 00:57:07 UTC

[geode] branch feature/GEODE-6291 updated: WIP: SqlToPdxInstanceTest needs to be rewritten

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch feature/GEODE-6291
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6291 by this push:
     new e9ae3a9  WIP: SqlToPdxInstanceTest needs to be rewritten
e9ae3a9 is described below

commit e9ae3a9b29a5c9a5f01ef991d122705cad0d2848
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed Jan 30 16:56:16 2019 -0800

    WIP: SqlToPdxInstanceTest needs to be rewritten
---
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |  10 +-
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |  13 +-
 .../connectors/jdbc/JdbcWriterIntegrationTest.java |  10 +-
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     |   2 +-
 .../apache/geode/connectors/jdbc/JdbcLoader.java   |  11 +-
 .../apache/geode/connectors/jdbc/JdbcWriter.java   |   2 +-
 .../jdbc/internal/AbstractJdbcCallback.java        |  16 +-
 .../geode/connectors/jdbc/internal/SqlHandler.java |  77 ++-
 .../connectors/jdbc/internal/SqlToPdxInstance.java | 258 +++++++
 .../jdbc/internal/SqlToPdxInstanceCreator.java     | 222 ++----
 .../jdbc/internal/AbstractJdbcCallbackTest.java    |  11 +-
 .../connectors/jdbc/internal/SqlHandlerTest.java   |  38 +-
 .../jdbc/internal/SqlToPdxInstanceCreatorTest.java | 758 +++------------------
 ...eCreatorTest.java => SqlToPdxInstanceTest.java} |   6 +-
 14 files changed, 522 insertions(+), 912 deletions(-)

diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 15f2b68..d2c78cb 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -326,10 +326,9 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   private Region<Object, PdxInstance> createRegionWithJDBCAsyncWriter(String regionName, String ids,
       List<FieldMapping> fieldMappings)
       throws RegionMappingExistsException {
-    jdbcWriter = new JdbcAsyncWriter(createSqlHandler(ids, fieldMappings), cache);
+    jdbcWriter = new JdbcAsyncWriter(createSqlHandler(regionName, ids, fieldMappings), cache);
     cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
         .create("jdbcAsyncQueue", jdbcWriter);
-
     RegionFactory<Object, PdxInstance> regionFactory = cache.createRegionFactory(REPLICATE);
     regionFactory.addAsyncEventQueueId("jdbcAsyncQueue");
     return regionFactory.create(regionName);
@@ -342,11 +341,12 @@ public abstract class JdbcAsyncWriterIntegrationTest {
     assertThat(size).isEqualTo(expected);
   }
 
-  private SqlHandler createSqlHandler(String ids, List<FieldMapping> fieldMappings)
+  private SqlHandler createSqlHandler(String regionName, String ids,
+      List<FieldMapping> fieldMappings)
       throws RegionMappingExistsException {
-    return new SqlHandler(new TableMetaDataManager(),
+    return new SqlHandler(cache, regionName, new TableMetaDataManager(),
         TestConfigService.getTestConfigService(ids, fieldMappings),
-        testDataSourceFactory);
+        testDataSourceFactory, false);
   }
 
 }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index b1a8d00..f9538bf 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -228,24 +228,27 @@ public abstract class JdbcLoaderIntegrationTest {
     assertThat(pdx).isNull();
   }
 
-  protected SqlHandler createSqlHandler(String pdxClassName, String ids, String catalog,
+  protected SqlHandler createSqlHandler(String regionName, String pdxClassName, String ids,
+      String catalog,
       String schema, List<FieldMapping> fieldMappings)
       throws RegionMappingExistsException {
-    return new SqlHandler(new TableMetaDataManager(),
+    return new SqlHandler(cache, regionName, new TableMetaDataManager(),
         TestConfigService.getTestConfigService((InternalCache) cache, pdxClassName, ids, catalog,
             schema, fieldMappings),
-        testDataSourceFactory);
+        testDataSourceFactory, true);
   }
 
   protected <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName,
       String ids, String catalog, String schema, List<FieldMapping> fieldMappings)
       throws RegionMappingExistsException {
     JdbcLoader<K, V> jdbcLoader =
-        new JdbcLoader<>(createSqlHandler(pdxClassName, ids, catalog, schema, fieldMappings),
+        new JdbcLoader<>(
+            createSqlHandler(regionName, pdxClassName, ids, catalog, schema, fieldMappings),
             cache);
     RegionFactory<K, V> regionFactory = cache.createRegionFactory(REPLICATE);
     regionFactory.setCacheLoader(jdbcLoader);
-    return regionFactory.create(regionName);
+    Region<K, V> region = regionFactory.create(regionName);
+    return region;
   }
 
   protected <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName,
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
index 9cfe1dd..d95699d 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
@@ -400,7 +400,8 @@ public abstract class JdbcWriterIntegrationTest {
   protected Region<Object, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName,
       String ids, String catalog, String schema, List<FieldMapping> fieldMappings)
       throws RegionMappingExistsException {
-    jdbcWriter = new JdbcWriter(createSqlHandler(ids, catalog, schema, fieldMappings), cache);
+    jdbcWriter =
+        new JdbcWriter(createSqlHandler(regionName, ids, catalog, schema, fieldMappings), cache);
 
     RegionFactory<Object, PdxInstance> regionFactory =
         cache.createRegionFactory(RegionShortcut.REPLICATE);
@@ -415,12 +416,13 @@ public abstract class JdbcWriterIntegrationTest {
     assertThat(size).isEqualTo(expected);
   }
 
-  protected SqlHandler createSqlHandler(String ids, String catalog, String schema,
+  protected SqlHandler createSqlHandler(String regionName, String ids, String catalog,
+      String schema,
       List<FieldMapping> fieldMappings)
       throws RegionMappingExistsException {
-    return new SqlHandler(new TableMetaDataManager(),
+    return new SqlHandler(cache, regionName, new TableMetaDataManager(),
         TestConfigService.getTestConfigService(cache, null, ids, catalog, schema, fieldMappings),
-        testDataSourceFactory);
+        testDataSourceFactory, false);
   }
 
   protected void assertRecordMatchesEmployee(ResultSet resultSet, String id, Employee employee)
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index e36469d..2668606 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -59,7 +59,7 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
     changeTotalEvents(events.size());
 
     if (!events.isEmpty()) {
-      checkInitialized((InternalCache) events.get(0).getRegion().getRegionService());
+      checkInitialized(events.get(0).getRegion());
     }
 
     Boolean initialPdxReadSerialized = cache.getPdxReadSerializedOverride();
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
index d76baf9..de1da33 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
@@ -42,16 +42,21 @@ public class JdbcLoader<K, V> extends AbstractJdbcCallback implements CacheLoade
     super(sqlHandler, cache);
   }
 
+  @Override
+  protected boolean supportsReads() {
+    return true;
+  }
+
   /**
    * @return this method always returns a PdxInstance. It does not matter what the V generic
    *         parameter is set to.
    */
   @Override
   public V load(LoaderHelper<K, V> helper) throws CacheLoaderException {
-    // The following cast to V is to keep the compiler happy
-    // but is erased at runtime and no actual cast happens.
-    checkInitialized((InternalCache) helper.getRegion().getRegionService());
+    checkInitialized(helper.getRegion());
     try {
+      // The following cast to V is to keep the compiler happy
+      // but is erased at runtime and no actual cast happens.
       return (V) getSqlHandler().read(helper.getRegion(), helper.getKey());
     } catch (SQLException e) {
       throw JdbcConnectorException.createException(e);
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
index 998cb8a..546094e 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
@@ -79,7 +79,7 @@ public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWrite
     if (eventCanBeIgnored(event.getOperation())) {
       return;
     }
-    checkInitialized((InternalCache) event.getRegion().getRegionService());
+    checkInitialized(event.getRegion());
     totalEvents.add(1);
     try {
       getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
index 3937f59..5282d20 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -17,6 +17,7 @@ package org.apache.geode.connectors.jdbc.internal;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.CacheCallback;
 import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.InternalCache;
 
 @Experimental
@@ -38,9 +39,9 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
     return sqlHandler;
   }
 
-  protected void checkInitialized(InternalCache cache) {
+  protected void checkInitialized(Region<?, ?> region) {
     if (sqlHandler == null) {
-      initialize(cache);
+      initialize(region);
     }
   }
 
@@ -48,12 +49,17 @@ public abstract class AbstractJdbcCallback implements CacheCallback {
     return operation.isLoad();
   }
 
-  private synchronized void initialize(InternalCache cache) {
+  protected boolean supportsReads() {
+    return false;
+  }
+
+  private synchronized void initialize(Region<?, ?> region) {
     if (sqlHandler == null) {
-      this.cache = cache;
+      this.cache = (InternalCache) region.getRegionService();
       JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
       TableMetaDataManager tableMetaDataManager = new TableMetaDataManager();
-      sqlHandler = new SqlHandler(tableMetaDataManager, service);
+      sqlHandler =
+          new SqlHandler(cache, region.getName(), tableMetaDataManager, service, supportsReads());
     }
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 3629041..a5ab6e1 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -36,28 +36,33 @@ import org.apache.geode.pdx.PdxInstance;
 
 @Experimental
 public class SqlHandler {
-  private final JdbcConnectorService configService;
   private final TableMetaDataManager tableMetaDataManager;
-  private final DataSourceFactory dataSourceFactory;
+  private final RegionMapping regionMapping;
+  private final DataSource dataSource;
+  private final SqlToPdxInstance sqlToPdxInstance;
 
-  public SqlHandler(TableMetaDataManager tableMetaDataManager, JdbcConnectorService configService,
-      DataSourceFactory dataSourceFactory) {
+  public SqlHandler(InternalCache cache, String regionName,
+      TableMetaDataManager tableMetaDataManager, JdbcConnectorService configService,
+      DataSourceFactory dataSourceFactory, boolean supportReading) {
     this.tableMetaDataManager = tableMetaDataManager;
-    this.configService = configService;
-    this.dataSourceFactory = dataSourceFactory;
+    this.regionMapping = getMappingForRegion(configService, regionName);
+    this.dataSource = getDataSource(dataSourceFactory, this.regionMapping.getDataSourceName());
+    this.sqlToPdxInstance = createSqlToPdxInstance(supportReading, cache, regionMapping);
   }
 
-  public SqlHandler(TableMetaDataManager tableMetaDataManager, JdbcConnectorService configService) {
-    this(tableMetaDataManager, configService,
-        dataSourceName -> JNDIInvoker.getDataSource(dataSourceName));
-  }
-
-  Connection getConnection(String dataSourceName) throws SQLException {
-    return getDataSource(dataSourceName).getConnection();
+  private static RegionMapping getMappingForRegion(JdbcConnectorService configService,
+      String regionName) {
+    RegionMapping regionMapping = configService.getMappingForRegion(regionName);
+    if (regionMapping == null) {
+      throw new JdbcConnectorException("JDBC mapping for region " + regionName
+          + " not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
+    }
+    return regionMapping;
   }
 
-  DataSource getDataSource(String dataSourceName) {
-    DataSource dataSource = this.dataSourceFactory.getDataSource(dataSourceName);
+  private static DataSource getDataSource(DataSourceFactory dataSourceFactory,
+      String dataSourceName) {
+    DataSource dataSource = dataSourceFactory.getDataSource(dataSourceName);
     if (dataSource == null) {
       throw new JdbcConnectorException("JDBC data-source named \"" + dataSourceName
           + "\" not found. Create it with gfsh 'create data-source --pooled --name="
@@ -66,14 +71,34 @@ public class SqlHandler {
     return dataSource;
   }
 
+  private static SqlToPdxInstance createSqlToPdxInstance(boolean supportReading,
+      InternalCache cache, RegionMapping regionMapping) {
+    if (!supportReading) {
+      return null;
+    }
+    SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
+        new SqlToPdxInstanceCreator(cache, regionMapping);
+    return sqlToPdxInstanceCreator.create();
+  }
+
+  public SqlHandler(InternalCache cache, String regionName,
+      TableMetaDataManager tableMetaDataManager, JdbcConnectorService configService,
+      boolean supportReading) {
+    this(cache, regionName, tableMetaDataManager, configService,
+        dataSourceName -> JNDIInvoker.getDataSource(dataSourceName), supportReading);
+  }
+
+  Connection getConnection() throws SQLException {
+    return this.dataSource.getConnection();
+  }
+
   public <K, V> PdxInstance read(Region<K, V> region, K key) throws SQLException {
     if (key == null) {
       throw new IllegalArgumentException("Key for query cannot be null");
     }
 
-    RegionMapping regionMapping = getMappingForRegion(region.getName());
     PdxInstance result;
-    try (Connection connection = getConnection(regionMapping.getDataSourceName())) {
+    try (Connection connection = getConnection()) {
       TableMetaDataView tableMetaData =
           this.tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
       EntryColumnData entryColumnData =
@@ -81,10 +106,7 @@ public class SqlHandler {
       try (PreparedStatement statement =
           getPreparedStatement(connection, tableMetaData, entryColumnData, Operation.GET)) {
         try (ResultSet resultSet = executeReadQuery(statement, entryColumnData)) {
-          InternalCache cache = (InternalCache) region.getRegionService();
-          SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
-              new SqlToPdxInstanceCreator(cache, regionMapping, resultSet);
-          result = sqlToPdxInstanceCreator.create();
+          result = sqlToPdxInstance.create(resultSet);
         }
       }
     }
@@ -97,16 +119,6 @@ public class SqlHandler {
     return statement.executeQuery();
   }
 
-  private RegionMapping getMappingForRegion(String regionName) {
-    RegionMapping regionMapping =
-        this.configService.getMappingForRegion(regionName);
-    if (regionMapping == null) {
-      throw new JdbcConnectorException("JDBC mapping for region " + regionName
-          + " not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
-    }
-    return regionMapping;
-  }
-
   private void setValuesInStatement(PreparedStatement statement, EntryColumnData entryColumnData,
       Operation operation)
       throws SQLException {
@@ -164,9 +176,8 @@ public class SqlHandler {
     if (value == null && !operation.isDestroy()) {
       throw new IllegalArgumentException("PdxInstance cannot be null for non-destroy operations");
     }
-    RegionMapping regionMapping = getMappingForRegion(region.getName());
 
-    try (Connection connection = getConnection(regionMapping.getDataSourceName())) {
+    try (Connection connection = getConnection()) {
       TableMetaDataView tableMetaData =
           this.tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
       EntryColumnData entryColumnData =
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstance.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstance.java
new file mode 100644
index 0000000..a66960c
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstance.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.Blob;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.pdx.FieldType;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.WritablePdxInstance;
+
+public class SqlToPdxInstance {
+  private PdxInstance pdxTemplate;
+  private final Map<String, PdxFieldInfo> columnToPdxFieldMap = new HashMap<>();
+
+  // for unit testing
+  PdxInstance getPdxTemplate() {
+    return pdxTemplate;
+  }
+
+  public void setPdxTemplate(PdxInstance template) {
+    this.pdxTemplate = template;
+  }
+
+  public void addMapping(String columnName, String pdxFieldName, FieldType pdxFieldType) {
+    columnToPdxFieldMap.put(columnName, new PdxFieldInfo(pdxFieldName, pdxFieldType));
+  }
+
+  // for unit testing
+  Map<String, PdxFieldInfo> getColumnToPdxFieldMap() {
+    return columnToPdxFieldMap;
+  }
+
+  static class PdxFieldInfo {
+    private final String name;
+    private final FieldType type;
+
+    public PdxFieldInfo(String name, FieldType type) {
+      this.name = name;
+      this.type = type;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public FieldType getType() {
+      return type;
+    }
+  }
+
+  public PdxInstance create(ResultSet resultSet) throws SQLException {
+    if (!resultSet.next()) {
+      return null;
+    }
+    WritablePdxInstance result = pdxTemplate.createWriter();
+    ResultSetMetaData metaData = resultSet.getMetaData();
+    final int columnCount = metaData.getColumnCount();
+    for (int i = 1; i <= columnCount; i++) {
+      String columnName = metaData.getColumnName(i);
+      PdxFieldInfo fieldInfo = this.columnToPdxFieldMap.get(columnName);
+      if (fieldInfo == null) {
+        // TODO: this column was added since create jdbc-mapping was done.
+        // Log a warning, once, and just ignore this column
+        continue;
+      }
+      Object fieldValue = getFieldValue(resultSet, i, fieldInfo.getType(), metaData);
+      result.setField(fieldInfo.getName(), fieldValue);
+    }
+    if (resultSet.next()) {
+      throw new JdbcConnectorException(
+          "Multiple rows returned for query: " + resultSet.getStatement());
+    }
+    return result;
+  }
+
+  /**
+   * @throws SQLException if the column value get fails
+   */
+  private Object getFieldValue(ResultSet resultSet, int columnIndex, FieldType fieldType,
+      ResultSetMetaData metaData)
+      throws SQLException {
+    switch (fieldType) {
+      case STRING:
+        return resultSet.getString(columnIndex);
+      case CHAR:
+        return getCharValue(resultSet, columnIndex);
+      case SHORT:
+        return resultSet.getShort(columnIndex);
+      case INT:
+        return resultSet.getInt(columnIndex);
+      case LONG:
+        return resultSet.getLong(columnIndex);
+      case FLOAT:
+        return resultSet.getFloat(columnIndex);
+      case DOUBLE:
+        return resultSet.getDouble(columnIndex);
+      case BYTE:
+        return resultSet.getByte(columnIndex);
+      case BOOLEAN:
+        return resultSet.getBoolean(columnIndex);
+      case DATE:
+        return getDateValue(resultSet, columnIndex, metaData);
+      case BYTE_ARRAY:
+        return getByteArrayValue(resultSet, columnIndex, metaData);
+      case BOOLEAN_ARRAY:
+        return convertJdbcObjectToJavaType(boolean[].class, resultSet.getObject(columnIndex));
+      case CHAR_ARRAY:
+        return convertJdbcObjectToJavaType(char[].class, resultSet.getObject(columnIndex));
+      case SHORT_ARRAY:
+        return convertJdbcObjectToJavaType(short[].class, resultSet.getObject(columnIndex));
+      case INT_ARRAY:
+        return convertJdbcObjectToJavaType(int[].class, resultSet.getObject(columnIndex));
+      case LONG_ARRAY:
+        return convertJdbcObjectToJavaType(long[].class, resultSet.getObject(columnIndex));
+      case FLOAT_ARRAY:
+        return convertJdbcObjectToJavaType(float[].class, resultSet.getObject(columnIndex));
+      case DOUBLE_ARRAY:
+        return convertJdbcObjectToJavaType(double[].class, resultSet.getObject(columnIndex));
+      case STRING_ARRAY:
+        return convertJdbcObjectToJavaType(String[].class, resultSet.getObject(columnIndex));
+      case OBJECT_ARRAY:
+        return convertJdbcObjectToJavaType(Object[].class, resultSet.getObject(columnIndex));
+      case ARRAY_OF_BYTE_ARRAYS:
+        return convertJdbcObjectToJavaType(byte[][].class, resultSet.getObject(columnIndex));
+      case OBJECT:
+        return getObjectValue(resultSet, columnIndex, metaData);
+      default:
+        throw new IllegalStateException("unhandled pdx field type: " + fieldType);
+    }
+  }
+
+  private Object getObjectValue(ResultSet resultSet, int columnIndex, ResultSetMetaData metaData)
+      throws SQLException {
+    Object v;
+    if (isBlobColumn(columnIndex, metaData)) {
+      v = getBlobData(resultSet, columnIndex);
+    } else {
+      v = resultSet.getObject(columnIndex);
+      if (v instanceof java.util.Date) {
+        if (v instanceof java.sql.Date) {
+          java.sql.Date sqlDate = (java.sql.Date) v;
+          v = new java.util.Date(sqlDate.getTime());
+        } else if (v instanceof java.sql.Time) {
+          java.sql.Time sqlTime = (java.sql.Time) v;
+          v = new java.util.Date(sqlTime.getTime());
+        } else if (v instanceof java.sql.Timestamp) {
+          java.sql.Timestamp sqlTimestamp = (java.sql.Timestamp) v;
+          v = new java.util.Date(sqlTimestamp.getTime());
+        }
+      }
+    }
+    return v;
+  }
+
+  private Object getByteArrayValue(ResultSet resultSet, int columnIndex, ResultSetMetaData metaData)
+      throws SQLException {
+    byte[] byteData;
+    if (isBlobColumn(columnIndex, metaData)) {
+      byteData = getBlobData(resultSet, columnIndex);
+    } else {
+      byteData = resultSet.getBytes(columnIndex);
+    }
+    return byteData;
+  }
+
+  private Object getCharValue(ResultSet resultSet, int columnIndex) throws SQLException {
+    char charValue = 0;
+    String columnValue = resultSet.getString(columnIndex);
+    if (columnValue != null && columnValue.length() > 0) {
+      charValue = columnValue.toCharArray()[0];
+    }
+    return charValue;
+  }
+
+  private java.util.Date getDateValue(ResultSet resultSet, int columnIndex,
+      ResultSetMetaData metaData)
+      throws SQLException {
+    java.util.Date sqlDate;
+    int columnType = metaData.getColumnType(columnIndex);
+    switch (columnType) {
+      case Types.DATE:
+        sqlDate = resultSet.getDate(columnIndex);
+        break;
+      case Types.TIME:
+      case Types.TIME_WITH_TIMEZONE:
+        sqlDate = resultSet.getTime(columnIndex);
+        break;
+      default:
+        sqlDate = resultSet.getTimestamp(columnIndex);
+        break;
+    }
+    java.util.Date pdxDate = null;
+    if (sqlDate != null) {
+      pdxDate = new java.util.Date(sqlDate.getTime());
+    }
+    return pdxDate;
+  }
+
+  private boolean isBlobColumn(int columnIndex, ResultSetMetaData metaData) throws SQLException {
+    int columnType = metaData.getColumnType(columnIndex);
+    return Types.BLOB == columnType;
+  }
+
+  /**
+   * If the given column contains a Blob returns its data as a byte array;
+   * otherwise return null.
+   *
+   * @throws JdbcConnectorException if blob is too big to fit in a byte array
+   */
+  private byte[] getBlobData(ResultSet resultSet, int columnIndex) throws SQLException {
+    Blob blob = resultSet.getBlob(columnIndex);
+    if (blob == null) {
+      return null;
+    }
+    try {
+      long blobLength = blob.length();
+      if (blobLength > Integer.MAX_VALUE) {
+        throw new JdbcConnectorException(
+            "Blob of length " + blobLength + " is too big to be converted to a byte array.");
+      }
+      return blob.getBytes(1, (int) blobLength);
+    } finally {
+      blob.free();
+    }
+  }
+
+  private <T> T convertJdbcObjectToJavaType(Class<T> javaType, Object jdbcObject) {
+    try {
+      return javaType.cast(jdbcObject);
+    } catch (ClassCastException classCastException) {
+      throw JdbcConnectorException.createException("Could not convert "
+          + jdbcObject.getClass().getTypeName() + " to " + javaType.getTypeName(),
+          classCastException);
+    }
+  }
+
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
index 84cdce0..d55de8b 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
@@ -14,18 +14,12 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
-import java.sql.Blob;
 import java.sql.JDBCType;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
 
-import org.apache.geode.connectors.jdbc.JdbcConnectorException;
 import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.FieldType;
-import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxInstanceFactory;
 import org.apache.geode.pdx.internal.PdxField;
 import org.apache.geode.pdx.internal.TypeRegistry;
@@ -33,32 +27,18 @@ import org.apache.geode.pdx.internal.TypeRegistry;
 class SqlToPdxInstanceCreator {
   private final InternalCache cache;
   private final RegionMapping regionMapping;
-  private final ResultSet resultSet;
-  private final PdxInstanceFactory factory;
 
-  public SqlToPdxInstanceCreator(InternalCache cache, RegionMapping regionMapping,
-      ResultSet resultSet) {
+  public SqlToPdxInstanceCreator(InternalCache cache, RegionMapping regionMapping) {
     this.cache = cache;
     this.regionMapping = regionMapping;
-    this.resultSet = resultSet;
-    this.factory = createPdxInstanceFactory();
   }
 
-  public PdxInstance create() throws SQLException {
-    if (!resultSet.next()) {
-      return null;
-    }
+  public SqlToPdxInstance create() {
     TypeRegistry typeRegistry = cache.getPdxRegistry();
-    ResultSetMetaData metaData = resultSet.getMetaData();
-    final int columnCount = metaData.getColumnCount();
-    for (int i = 1; i <= columnCount; i++) {
-      String columnName = metaData.getColumnName(i);
-      FieldMapping columnMapping = regionMapping.getFieldMappingByJdbcName(columnName);
-      if (columnMapping == null) {
-        // TODO: this column was added since create jdbc-mapping was done.
-        // Log a warning, once, and just ignore this column
-        continue;
-      }
+    SqlToPdxInstance result = new SqlToPdxInstance();
+    PdxInstanceFactory templateFactory = createPdxInstanceFactory();
+    for (FieldMapping columnMapping : regionMapping.getFieldMappings()) {
+      String columnName = columnMapping.getJdbcName();
       String fieldName = columnMapping.getPdxName();
       FieldType fieldType;
       if (fieldName.isEmpty()) {
@@ -75,13 +55,11 @@ class SqlToPdxInstanceCreator {
       } else {
         fieldType = FieldType.valueOf(columnMapping.getPdxType());
       }
-      writeField(columnMapping, i, fieldName, fieldType);
-    }
-    if (resultSet.next()) {
-      throw new JdbcConnectorException(
-          "Multiple rows returned for query: " + resultSet.getStatement());
+      result.addMapping(columnName, fieldName, fieldType);
+      writeField(templateFactory, columnMapping, fieldName, fieldType);
     }
-    return factory.create();
+    result.setPdxTemplate(templateFactory.create());
+    return result;
   }
 
   private PdxInstanceFactory createPdxInstanceFactory() {
@@ -89,185 +67,86 @@ class SqlToPdxInstanceCreator {
     return cache.createPdxInstanceFactory(valueClassName);
   }
 
-  /**
-   * @throws SQLException if the column value get fails
-   */
-  private void writeField(FieldMapping columnMapping, int columnIndex, String fieldName,
-      FieldType fieldType)
-      throws SQLException {
+  private void writeField(PdxInstanceFactory factory, FieldMapping columnMapping, String fieldName,
+      FieldType fieldType) {
     switch (fieldType) {
       case STRING:
-        factory.writeString(fieldName, resultSet.getString(columnIndex));
+        factory.writeString(fieldName, null);
         break;
       case CHAR:
-        char charValue = 0;
-        String columnValue = resultSet.getString(columnIndex);
-        if (columnValue != null && columnValue.length() > 0) {
-          charValue = columnValue.toCharArray()[0];
-        }
-        factory.writeChar(fieldName, charValue);
+        factory.writeChar(fieldName, (char) 0);
         break;
       case SHORT:
-        factory.writeShort(fieldName, resultSet.getShort(columnIndex));
+        factory.writeShort(fieldName, (short) 0);
         break;
       case INT:
-        factory.writeInt(fieldName, resultSet.getInt(columnIndex));
+        factory.writeInt(fieldName, 0);
         break;
       case LONG:
-        factory.writeLong(fieldName, resultSet.getLong(columnIndex));
+        factory.writeLong(fieldName, 0L);
         break;
       case FLOAT:
-        factory.writeFloat(fieldName, resultSet.getFloat(columnIndex));
+        factory.writeFloat(fieldName, 0);
         break;
       case DOUBLE:
-        factory.writeDouble(fieldName, resultSet.getDouble(columnIndex));
+        factory.writeDouble(fieldName, 0);
         break;
       case BYTE:
-        factory.writeByte(fieldName, resultSet.getByte(columnIndex));
+        factory.writeByte(fieldName, (byte) 0);
         break;
       case BOOLEAN:
-        factory.writeBoolean(fieldName, resultSet.getBoolean(columnIndex));
+        factory.writeBoolean(fieldName, false);
         break;
-      case DATE: {
-        factory.writeDate(fieldName, getPdxDate(columnIndex, columnMapping));
+      case DATE:
+        factory.writeDate(fieldName, null);
         break;
-      }
       case BYTE_ARRAY:
-        byte[] byteData;
-        if (isBlobColumn(columnMapping)) {
-          byteData = getBlobData(columnIndex);
-        } else {
-          byteData = resultSet.getBytes(columnIndex);
-        }
-        factory.writeByteArray(fieldName, byteData);
+        factory.writeByteArray(fieldName, null);
         break;
       case BOOLEAN_ARRAY:
-        factory.writeBooleanArray(fieldName,
-            convertJdbcObjectToJavaType(boolean[].class, resultSet.getObject(columnIndex)));
+        factory.writeBooleanArray(fieldName, null);
         break;
       case CHAR_ARRAY:
-        factory.writeCharArray(fieldName,
-            convertJdbcObjectToJavaType(char[].class, resultSet.getObject(columnIndex)));
+        factory.writeCharArray(fieldName, null);
         break;
       case SHORT_ARRAY:
-        factory.writeShortArray(fieldName,
-            convertJdbcObjectToJavaType(short[].class, resultSet.getObject(columnIndex)));
+        factory.writeShortArray(fieldName, null);
         break;
       case INT_ARRAY:
-        factory.writeIntArray(fieldName,
-            convertJdbcObjectToJavaType(int[].class, resultSet.getObject(columnIndex)));
+        factory.writeIntArray(fieldName, null);
         break;
       case LONG_ARRAY:
-        factory.writeLongArray(fieldName,
-            convertJdbcObjectToJavaType(long[].class, resultSet.getObject(columnIndex)));
+        factory.writeLongArray(fieldName, null);
         break;
       case FLOAT_ARRAY:
-        factory.writeFloatArray(fieldName,
-            convertJdbcObjectToJavaType(float[].class, resultSet.getObject(columnIndex)));
+        factory.writeFloatArray(fieldName, null);
         break;
       case DOUBLE_ARRAY:
-        factory.writeDoubleArray(fieldName,
-            convertJdbcObjectToJavaType(double[].class, resultSet.getObject(columnIndex)));
+        factory.writeDoubleArray(fieldName, null);
         break;
       case STRING_ARRAY:
-        factory.writeStringArray(fieldName,
-            convertJdbcObjectToJavaType(String[].class, resultSet.getObject(columnIndex)));
+        factory.writeStringArray(fieldName, null);
         break;
       case OBJECT_ARRAY:
-        factory.writeObjectArray(fieldName,
-            convertJdbcObjectToJavaType(Object[].class, resultSet.getObject(columnIndex)));
+        factory.writeObjectArray(fieldName, null);
         break;
       case ARRAY_OF_BYTE_ARRAYS:
-        factory.writeArrayOfByteArrays(fieldName,
-            convertJdbcObjectToJavaType(byte[][].class, resultSet.getObject(columnIndex)));
+        factory.writeArrayOfByteArrays(fieldName, null);
         break;
-      case OBJECT: {
-        Object v;
-        if (isBlobColumn(columnMapping)) {
-          v = getBlobData(columnIndex);
-        } else {
-          v = resultSet.getObject(columnIndex);
-          if (v instanceof java.util.Date) {
-            if (v instanceof java.sql.Date) {
-              java.sql.Date sqlDate = (java.sql.Date) v;
-              v = new java.util.Date(sqlDate.getTime());
-            } else if (v instanceof java.sql.Time) {
-              java.sql.Time sqlTime = (java.sql.Time) v;
-              v = new java.util.Date(sqlTime.getTime());
-            } else if (v instanceof java.sql.Timestamp) {
-              java.sql.Timestamp sqlTimestamp = (java.sql.Timestamp) v;
-              v = new java.util.Date(sqlTimestamp.getTime());
-            }
-          }
-        }
-        factory.writeObject(fieldName, v);
-        break;
-      }
-    }
-  }
-
-  private java.util.Date getPdxDate(int columnIndex, FieldMapping columnMapping)
-      throws SQLException {
-    java.util.Date sqlDate;
-    JDBCType columnType = JDBCType.valueOf(columnMapping.getJdbcType());
-    switch (columnType) {
-      case DATE:
-        sqlDate = resultSet.getDate(columnIndex);
-        break;
-      case TIME:
-      case TIME_WITH_TIMEZONE:
-        sqlDate = resultSet.getTime(columnIndex);
+      case OBJECT:
+        factory.writeObject(fieldName, null);
         break;
       default:
-        sqlDate = resultSet.getTimestamp(columnIndex);
-        break;
-    }
-    java.util.Date pdxDate = null;
-    if (sqlDate != null) {
-      pdxDate = new java.util.Date(sqlDate.getTime());
-    }
-    return pdxDate;
-  }
-
-  private boolean isBlobColumn(FieldMapping columnMapping) throws SQLException {
-    return JDBCType.BLOB.name().equals(columnMapping.getJdbcType());
-  }
-
-  /**
-   * If the given column contains a Blob returns its data as a byte array;
-   * otherwise return null.
-   *
-   * @throws JdbcConnectorException if blob is too big to fit in a byte array
-   */
-  private byte[] getBlobData(int columnIndex) throws SQLException {
-    Blob blob = resultSet.getBlob(columnIndex);
-    if (blob == null) {
-      return null;
-    }
-    try {
-      long blobLength = blob.length();
-      if (blobLength > Integer.MAX_VALUE) {
-        throw new JdbcConnectorException(
-            "Blob of length " + blobLength + " is too big to be converted to a byte array.");
-      }
-      return blob.getBytes(1, (int) blobLength);
-    } finally {
-      blob.free();
-    }
-  }
-
-  private <T> T convertJdbcObjectToJavaType(Class<T> javaType, Object jdbcObject) {
-    try {
-      return javaType.cast(jdbcObject);
-    } catch (ClassCastException classCastException) {
-      throw JdbcConnectorException.createException("Could not convert "
-          + jdbcObject.getClass().getTypeName() + " to " + javaType.getTypeName(),
-          classCastException);
+        throw new IllegalStateException("unhandled pdx field type " + fieldType);
     }
   }
 
   static FieldType computeFieldType(boolean isNullable, JDBCType jdbcType) {
     switch (jdbcType) {
+      case NULL:
+        throw new IllegalStateException("unexpected NULL jdbc column type");
+      case BOOLEAN:
+        return computeType(isNullable, FieldType.BOOLEAN);
       case BIT: // 1 bit
         return computeType(isNullable, FieldType.BOOLEAN);
       case TINYINT: // unsigned 8 bits
@@ -291,23 +170,19 @@ class SqlToPdxInstanceCreator {
       case LONGVARCHAR:
         return FieldType.STRING;
       case DATE:
-        return computeDate(isNullable);
+        return FieldType.DATE;
       case TIME:
-        return computeDate(isNullable);
+        return FieldType.DATE;
       case TIMESTAMP:
-        return computeDate(isNullable);
+        return FieldType.DATE;
       case BINARY:
         return FieldType.BYTE_ARRAY;
       case VARBINARY:
         return FieldType.BYTE_ARRAY;
       case LONGVARBINARY:
         return FieldType.BYTE_ARRAY;
-      case NULL:
-        throw new IllegalStateException("unexpected NULL jdbc column type");
       case BLOB:
         return FieldType.BYTE_ARRAY;
-      case BOOLEAN:
-        return computeType(isNullable, FieldType.BOOLEAN);
       case NCHAR:
         return FieldType.STRING;
       case NVARCHAR:
@@ -315,9 +190,9 @@ class SqlToPdxInstanceCreator {
       case LONGNVARCHAR:
         return FieldType.STRING;
       case TIME_WITH_TIMEZONE:
-        return computeDate(isNullable);
+        return FieldType.DATE;
       case TIMESTAMP_WITH_TIMEZONE:
-        return computeDate(isNullable);
+        return FieldType.DATE;
       default:
         return FieldType.OBJECT;
     }
@@ -330,9 +205,4 @@ class SqlToPdxInstanceCreator {
     return nonNullType;
 
   }
-
-  private static FieldType computeDate(boolean isNullable) {
-    return computeType(isNullable, FieldType.DATE);
-  }
-
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
index 7a54c4e..3030f74 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
@@ -24,6 +24,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.fake.Fakes;
 
@@ -47,7 +49,7 @@ public class AbstractJdbcCallbackTest {
 
   @Test
   public void checkInitializedDoesNothingIfInitialized() {
-    jdbcCallback.checkInitialized(mock(InternalCache.class));
+    jdbcCallback.checkInitialized(mock(Region.class));
     assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
   }
 
@@ -55,11 +57,16 @@ public class AbstractJdbcCallbackTest {
   public void initializedSqlHandlerIfNoneExists() {
     jdbcCallback = new AbstractJdbcCallback() {};
     InternalCache cache = mock(InternalCache.class);
+    Region region = mock(Region.class);
+    when(region.getRegionService()).thenReturn(cache);
+    when(region.getName()).thenReturn("regionName");
     JdbcConnectorService service = mock(JdbcConnectorService.class);
     when(cache.getService(any())).thenReturn(service);
     assertThat(jdbcCallback.getSqlHandler()).isNull();
+    RegionMapping regionMapping = mock(RegionMapping.class);
+    when(service.getMappingForRegion("regionName")).thenReturn(regionMapping);
 
-    jdbcCallback.checkInitialized(cache);
+    jdbcCallback.checkInitialized(region);
 
     assertThat(jdbcCallback.getSqlHandler()).isNotNull();
   }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index 53c1476..2112daa 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -52,6 +52,7 @@ import org.apache.geode.connectors.jdbc.internal.SqlHandler.DataSourceFactory;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
 import org.apache.geode.pdx.internal.PdxInstanceImpl;
 import org.apache.geode.pdx.internal.PdxType;
 
@@ -86,6 +87,10 @@ public class SqlHandlerTest {
     region = mock(Region.class);
     when(region.getName()).thenReturn(REGION_NAME);
     cache = mock(InternalCache.class);
+    PdxInstance pdxInstance = mock(PdxInstance.class);
+    PdxInstanceFactory pdxInstanceFactory = mock(PdxInstanceFactory.class);
+    when(pdxInstanceFactory.create()).thenReturn(pdxInstance);
+    when(cache.createPdxInstanceFactory(any())).thenReturn(pdxInstanceFactory);
     connection = mock(Connection.class);
     when(region.getRegionService()).thenReturn(cache);
     tableMetaDataManager = mock(TableMetaDataManager.class);
@@ -98,7 +103,6 @@ public class SqlHandlerTest {
     connectorService = mock(JdbcConnectorService.class);
     dataSourceFactory = mock(DataSourceFactory.class);
     when(dataSourceFactory.getDataSource(DATA_SOURCE_NAME)).thenReturn(dataSource);
-    handler = new SqlHandler(tableMetaDataManager, connectorService, dataSourceFactory);
     key = "key";
     value = mock(PdxInstanceImpl.class);
     when(value.getPdxType()).thenReturn(mock(PdxType.class));
@@ -114,6 +118,8 @@ public class SqlHandlerTest {
 
     statement = mock(PreparedStatement.class);
     when(this.connection.prepareStatement(any())).thenReturn(statement);
+    handler = new SqlHandler(cache, REGION_NAME, tableMetaDataManager, connectorService,
+        dataSourceFactory, true);
   }
 
   @Test
@@ -122,29 +128,25 @@ public class SqlHandlerTest {
     handler.read(region, null);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
-  public void readThrowsIfNoMapping() throws Exception {
-    Region region = mock(Region.class);
-    when(region.getName()).thenReturn("myRegionName");
+  public void constructorThrowsIfNoMapping() throws Exception {
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage(
-        "JDBC mapping for region myRegionName not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
-    handler.read(region, new Object());
+        "JDBC mapping for region regionWithNoMapping not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
+
+    new SqlHandler(cache, "regionWithNoMapping", tableMetaDataManager, connectorService,
+        dataSourceFactory, true);
   }
 
   @Test
-  public void readThrowsIfNoConnectionConfig() throws Exception {
-    @SuppressWarnings("unchecked")
-    Region<Object, Object> region2 = mock(Region.class);
-    when(region2.getName()).thenReturn("region2");
-    RegionMapping regionMapping2 = mock(RegionMapping.class);
-    when(regionMapping2.getDataSourceName()).thenReturn("bogus data source name");
-    when(regionMapping2.getRegionName()).thenReturn("region2");
-    when(connectorService.getMappingForRegion("region2")).thenReturn(regionMapping2);
-
+  public void constructorThrowsIfNoConnectionConfig() throws Exception {
+    when(regionMapping.getDataSourceName()).thenReturn("bogus data source name");
     thrown.expect(JdbcConnectorException.class);
-    handler.read(region2, new Object());
+    thrown.expectMessage(
+        "JDBC data-source named \"bogus data source name\" not found. Create it with gfsh 'create data-source --pooled --name=bogus data source name'.");
+
+    new SqlHandler(cache, REGION_NAME, tableMetaDataManager, connectorService, dataSourceFactory,
+        true);
   }
 
   @Test
@@ -804,7 +806,7 @@ public class SqlHandlerTest {
   public void handlesSQLExceptionFromGetConnection() throws Exception {
     doThrow(new SQLException("test exception")).when(dataSource).getConnection();
 
-    assertThatThrownBy(() -> handler.getConnection(DATA_SOURCE_NAME))
+    assertThatThrownBy(() -> handler.getConnection())
         .isInstanceOf(SQLException.class).hasMessage("test exception");
   }
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
index d4b7214..a9d2a8b 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
@@ -16,20 +16,14 @@ package org.apache.geode.connectors.jdbc.internal;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import java.sql.Blob;
 import java.sql.JDBCType;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.Date;
+import java.util.Arrays;
+import java.util.Map;
 
 import junitparams.JUnitParamsRunner;
 import junitparams.Parameters;
@@ -39,7 +33,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 
-import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.connectors.jdbc.internal.SqlToPdxInstance.PdxFieldInfo;
 import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.internal.cache.InternalCache;
@@ -59,11 +53,12 @@ public class SqlToPdxInstanceCreatorTest {
   private static final String KEY_COLUMN = "keyColumn";
   private static final String PDX_FIELD_NAME_1 = COLUMN_NAME_1.toLowerCase();
   private static final String PDX_FIELD_NAME_2 = COLUMN_NAME_2.toLowerCase();
+  private static final String PDX_CLASS_NAME = "myPdxClassName";
 
   private InternalCache cache;
   private RegionMapping regionMapping;
   private FieldMapping columnMapping = mock(FieldMapping.class);
-  private ResultSet resultSet;
+  private PdxInstance pdxTemplate = mock(PdxInstance.class);
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
@@ -77,734 +72,163 @@ public class SqlToPdxInstanceCreatorTest {
     when(columnMapping.getPdxName()).thenReturn(PDX_FIELD_NAME_1);
     when(columnMapping.getPdxType()).thenReturn(FieldType.OBJECT.name());
     when(regionMapping.getFieldMappingByJdbcName(COLUMN_NAME_1)).thenReturn(columnMapping);
-    resultSet = mock(ResultSet.class);
+    when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(columnMapping));
   }
 
   @Test
-  public void usesPdxFactoryForClassWhenExists() throws Exception {
-    String pdxClassName = "classname";
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-    when(resultSet.next()).thenReturn(false);
-
-    createPdxInstance();
-
-    verify(cache).createPdxInstanceFactory(pdxClassName);
-    verifyNoMoreInteractions(cache);
-  }
-
-  @Test
-  public void readReturnsNullIfNoResultsReturned() throws Exception {
-    when(resultSet.next()).thenReturn(false);
-
-    PdxInstance pdxInstance = createPdxInstance();
-
-    assertThat(pdxInstance).isNull();
-  }
-
-  @Test
-  public void readResultIncludesKeyColumnInPdxValue() throws Exception {
-    setupResultSetForTwoObjectColumns(resultSet);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-    FieldMapping columnMapping2 = mock(FieldMapping.class);
-    when(columnMapping2.getJdbcName()).thenReturn(COLUMN_NAME_2);
-    when(columnMapping2.getPdxName()).thenReturn(PDX_FIELD_NAME_2);
-    when(columnMapping2.getPdxType()).thenReturn(FieldType.OBJECT.name());
-    when(regionMapping.getFieldMappingByJdbcName(COLUMN_NAME_2)).thenReturn(columnMapping2);
-
-    createPdxInstance();
-
-    verify(factory).writeObject(PDX_FIELD_NAME_1, COLUMN_VALUE_1);
-    verify(factory).writeObject(PDX_FIELD_NAME_2, COLUMN_VALUE_2);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readReturnsDataFromAllResultColumns() throws Exception {
-    setupResultSetForTwoObjectColumns(resultSet);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-    FieldMapping columnMapping2 = mock(FieldMapping.class);
-    when(columnMapping2.getJdbcName()).thenReturn(COLUMN_NAME_2);
-    when(columnMapping2.getPdxName()).thenReturn(PDX_FIELD_NAME_2);
-    when(columnMapping2.getPdxType()).thenReturn(FieldType.OBJECT.name());
-    when(regionMapping.getFieldMappingByJdbcName(COLUMN_NAME_2)).thenReturn(columnMapping2);
-
-    createPdxInstance();
-
-    verify(factory).writeObject(PDX_FIELD_NAME_1, COLUMN_VALUE_1);
-    verify(factory).writeObject(PDX_FIELD_NAME_2, COLUMN_VALUE_2);
-    verify(factory).create();
-  }
-
-  @Test
-  public void usesMappedPdxFieldNameWhenReading() throws Exception {
-    setupResultSet(resultSet, FieldType.OBJECT, COLUMN_VALUE_1);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String fieldName1 = "pdxFieldName1";
-    when(columnMapping.getPdxName()).thenReturn(fieldName1);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-
-    createPdxInstance();
-
-    verify(factory).writeObject(fieldName1, COLUMN_VALUE_1);
-    verify(factory).create();
-  }
-
-  @Test
-  public void fieldsAreNotWrittenIfNoFieldMapping() throws Exception {
-    setupResultSet(resultSet, FieldType.OBJECT, COLUMN_VALUE_1);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-    when(regionMapping.getFieldMappingByJdbcName(COLUMN_NAME_1)).thenReturn(null);
+  @Parameters(source = FieldType.class)
+  public void readWritesFieldGivenPdxFieldType(FieldType fieldType) throws Exception {
+    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
+    when(columnMapping.getJdbcType()).thenReturn(JDBCType.NULL.name());
+    when(columnMapping.getPdxType()).thenReturn(fieldType.name());
 
-    createPdxInstance();
+    createSqlToPdxInstance();
 
+    verifyPdxFactoryWrite(factory, fieldType);
     verify(factory).create();
-    verifyNoMoreInteractions(factory);
   }
 
   @Test
-  public void readingNonNullIntegerColumnWithNoPdxTypeCausesIntPdxFieldToBeWritten()
+  public void pdxFieldGeneratedFromColumnNameAndTypeGivenNoPdxNameAndNoTypeInRegistry()
       throws Exception {
-    setupResultSet(resultSet, FieldType.INT, Integer.valueOf(1979));
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    TypeRegistry pdxTypeRegistry = mock(TypeRegistry.class);
-    when(cache.getPdxRegistry()).thenReturn(pdxTypeRegistry);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
+    PdxInstanceFactory factory = setupPdxInstanceFactory(null);
+    when(columnMapping.getJdbcType()).thenReturn(JDBCType.VARCHAR.name());
     when(columnMapping.getPdxName()).thenReturn("");
     when(columnMapping.getPdxType()).thenReturn("");
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.INTEGER.name());
-    when(columnMapping.isJdbcNullable()).thenReturn(false);
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
+    when(cache.getPdxRegistry()).thenReturn(typeRegistry);
 
-    createPdxInstance();
+    SqlToPdxInstance result = createSqlToPdxInstance();
 
-    verify(factory).writeInt(COLUMN_NAME_1, 1979);
+    verify(factory).writeString(COLUMN_NAME_1, null);
     verify(factory).create();
+    assertThat(result).isNotNull();
+    assertThat(result.getPdxTemplate()).isSameAs(pdxTemplate);
+    Map<String, PdxFieldInfo> map = result.getColumnToPdxFieldMap();
+    assertThat(map).hasSize(1);
+    assertThat(map).containsKey(COLUMN_NAME_1);
+    assertThat(map.get(COLUMN_NAME_1).getName()).isEqualTo(COLUMN_NAME_1);
+    assertThat(map.get(COLUMN_NAME_1).getType()).isEqualTo(FieldType.STRING);
   }
 
   @Test
-  public void readingNullableIntegerColumnWithNoPdxTypeCausesObjectPdxFieldToBeWritten()
+  public void pdxFieldGeneratedFromRegistryPdxFieldGivenNoPdxNameAndTypeInRegistry()
       throws Exception {
-    setupResultSet(resultSet, FieldType.OBJECT, Integer.valueOf(1979));
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    TypeRegistry pdxTypeRegistry = mock(TypeRegistry.class);
-    when(cache.getPdxRegistry()).thenReturn(pdxTypeRegistry);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
+    PdxInstanceFactory factory = setupPdxInstanceFactory(null);
+    when(columnMapping.getJdbcType()).thenReturn(JDBCType.NULL.name());
     when(columnMapping.getPdxName()).thenReturn("");
     when(columnMapping.getPdxType()).thenReturn("");
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.INTEGER.name());
-    when(columnMapping.isJdbcNullable()).thenReturn(true);
-
-    createPdxInstance();
-
-    verify(factory).writeObject(COLUMN_NAME_1, Integer.valueOf(1979));
-    verify(factory).create();
-  }
-
-  @Test
-  public void fieldWritingGivesPrecedenceToFindFieldThatMatchesNameWhenRegionMappingPdxNameIsEmpty()
-      throws Exception {
-    setupResultSet(resultSet, FieldType.SHORT, Short.valueOf((short) 1979));
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    TypeRegistry pdxTypeRegistry = mock(TypeRegistry.class);
+    TypeRegistry typeRegistry = mock(TypeRegistry.class);
     PdxField pdxField = mock(PdxField.class);
-    when(pdxField.getFieldName()).thenReturn("myPdxField1");
-    when(pdxField.getFieldType()).thenReturn(FieldType.SHORT);
-    when(pdxTypeRegistry.findFieldThatMatchesName(pdxClassName, COLUMN_NAME_1))
-        .thenReturn(pdxField);
-    when(cache.getPdxRegistry()).thenReturn(pdxTypeRegistry);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-    when(columnMapping.getPdxName()).thenReturn("");
-    when(columnMapping.getPdxType()).thenReturn("");
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.INTEGER.name());
-    when(columnMapping.isJdbcNullable()).thenReturn(true);
+    when(pdxField.getFieldName()).thenReturn("customPdxFieldName");
+    when(pdxField.getFieldType()).thenReturn(FieldType.OBJECT);
+    when(typeRegistry.findFieldThatMatchesName(PDX_CLASS_NAME, COLUMN_NAME_1)).thenReturn(pdxField);
+    when(cache.getPdxRegistry()).thenReturn(typeRegistry);
 
-    createPdxInstance();
+    SqlToPdxInstance result = createSqlToPdxInstance();
 
-    verify(factory).writeShort("myPdxField1", (short) 1979);
+    verify(factory).writeObject("customPdxFieldName", null);
     verify(factory).create();
+    assertThat(result).isNotNull();
+    assertThat(result.getPdxTemplate()).isSameAs(pdxTemplate);
+    Map<String, PdxFieldInfo> map = result.getColumnToPdxFieldMap();
+    assertThat(map).hasSize(1);
+    assertThat(map).containsKey(COLUMN_NAME_1);
+    assertThat(map.get(COLUMN_NAME_1).getName()).isEqualTo("customPdxFieldName");
+    assertThat(map.get(COLUMN_NAME_1).getType()).isEqualTo(FieldType.OBJECT);
   }
 
-  @Test
-  @Parameters(source = FieldType.class)
-  public void readWritesFieldGivenPdxFieldType(FieldType fieldType) throws Exception {
-    setupResultSet(resultSet, fieldType);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.NULL.name());
-    when(columnMapping.getPdxType()).thenReturn(fieldType.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType);
-    verify(factory).create();
-  }
-
-  private PdxInstance createPdxInstance() throws SQLException {
+  private SqlToPdxInstance createSqlToPdxInstance() throws SQLException {
     SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
-        new SqlToPdxInstanceCreator(cache, regionMapping, resultSet);
+        new SqlToPdxInstanceCreator(cache, regionMapping);
     return sqlToPdxInstanceCreator.create();
   }
 
-  @Test
-  @Parameters(source = FieldType.class)
-  public void readOfNullWritesFieldGivenPdxFieldType(FieldType fieldType) throws Exception {
-    setupResultSet(resultSet, fieldType, null);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.NULL.name());
-    when(columnMapping.getPdxType()).thenReturn(fieldType.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, null);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfCharFieldWithEmptyStringWritesCharZero() throws Exception {
-    char expectedValue = 0;
-    FieldType fieldType = FieldType.CHAR;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    when(resultSet.getString(1)).thenReturn("");
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfDateFieldWithDateColumnWritesDate() throws Exception {
-    FieldType fieldType = FieldType.DATE;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    java.sql.Date sqlDate = java.sql.Date.valueOf("1979-09-11");
-    Date expectedValue = new Date(sqlDate.getTime());
-    when(resultSet.getDate(1)).thenReturn(sqlDate);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.DATE.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfByteArrayFieldWithBlob() throws Exception {
-    FieldType fieldType = FieldType.BYTE_ARRAY;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    byte[] expectedValue = new byte[] {1, 2, 3};
-    Blob blob = mock(Blob.class);
-    when(blob.length()).thenReturn((long) expectedValue.length);
-    when(blob.getBytes(1, expectedValue.length)).thenReturn(expectedValue);
-    when(resultSet.getBlob(1)).thenReturn(blob);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.BLOB.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfByteArrayFieldWithNullBlob() throws Exception {
-    FieldType fieldType = FieldType.BYTE_ARRAY;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    byte[] expectedValue = null;
-    Blob blob = null;
-    when(resultSet.getBlob(1)).thenReturn(blob);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.BLOB.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfByteArrayFieldWithHugeBlobThrows() throws Exception {
-    FieldType fieldType = FieldType.BYTE_ARRAY;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    Blob blob = mock(Blob.class);
-    when(blob.length()).thenReturn((long) Integer.MAX_VALUE + 1);
-    when(resultSet.getBlob(1)).thenReturn(blob);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.BLOB.name());
-    thrown.expect(JdbcConnectorException.class);
-    thrown.expectMessage("Blob of length 2147483648 is too big to be converted to a byte array.");
-
-    createPdxInstance();
-  }
-
-  @Test
-  public void readOfObjectFieldWithBlob() throws Exception {
-    FieldType fieldType = FieldType.OBJECT;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    byte[] expectedValue = new byte[] {1, 2, 3};
-    Blob blob = mock(Blob.class);
-    when(blob.length()).thenReturn((long) expectedValue.length);
-    when(blob.getBytes(1, expectedValue.length)).thenReturn(expectedValue);
-    when(resultSet.getBlob(1)).thenReturn(blob);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.BLOB.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfDateFieldWithTimeColumnWritesDate() throws Exception {
-    FieldType fieldType = FieldType.DATE;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    java.sql.Time sqlTime = java.sql.Time.valueOf("22:33:44");
-    Date expectedValue = new Date(sqlTime.getTime());
-    when(resultSet.getTime(1)).thenReturn(sqlTime);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.TIME.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfDateFieldWithTimestampColumnWritesDate() throws Exception {
-    FieldType fieldType = FieldType.DATE;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1979-09-11 22:33:44.567");
-    Date expectedValue = new Date(sqlTimestamp.getTime());
-    when(resultSet.getTimestamp(1)).thenReturn(sqlTimestamp);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-    when(columnMapping.getJdbcType()).thenReturn(JDBCType.TIMESTAMP.name());
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfObjectFieldWithDateColumnWritesDate() throws Exception {
-    FieldType fieldType = FieldType.OBJECT;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    java.sql.Date sqlDate = java.sql.Date.valueOf("1979-09-11");
-    Date expectedValue = new Date(sqlDate.getTime());
-    when(resultSet.getObject(1)).thenReturn(sqlDate);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfObjectFieldWithJavaUtilDateWritesDate() throws Exception {
-    FieldType fieldType = FieldType.OBJECT;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    Date expectedValue = new Date();
-    when(resultSet.getObject(1)).thenReturn(expectedValue);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfObjectFieldWithTimeColumnWritesDate() throws Exception {
-    FieldType fieldType = FieldType.OBJECT;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    java.sql.Time sqlTime = java.sql.Time.valueOf("22:33:44");
-    Date expectedValue = new Date(sqlTime.getTime());
-    when(resultSet.getObject(1)).thenReturn(sqlTime);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  public void readOfObjectFieldWithTimestampColumnWritesDate() throws Exception {
-    FieldType fieldType = FieldType.OBJECT;
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(resultSet.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1979-09-11 22:33:44.567");
-    Date expectedValue = new Date(sqlTimestamp.getTime());
-    when(resultSet.getObject(1)).thenReturn(sqlTimestamp);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    PdxInstanceFactory factory = setupPdxInstanceFactory(fieldType);
-
-    createPdxInstance();
-
-    verifyPdxFactoryWrite(factory, fieldType, expectedValue);
-    verify(factory).create();
-  }
-
-  @Test
-  @Parameters({"BOOLEAN_ARRAY", "OBJECT_ARRAY", "CHAR_ARRAY", "SHORT_ARRAY", "INT_ARRAY",
-      "LONG_ARRAY", "FLOAT_ARRAY", "DOUBLE_ARRAY", "STRING_ARRAY", "ARRAY_OF_BYTE_ARRAYS"})
-  public void throwsExceptionWhenReadWritesUnsupportedType(FieldType fieldType) throws Exception {
-    String returnValue = "ReturnValue";
-    setupPdxInstanceFactory(fieldType);
-    setupResultSetForObject(resultSet, returnValue);
-    when(resultSet.next()).thenReturn(true).thenReturn(false);
-    thrown.expect(JdbcConnectorException.class);
-    thrown.expectMessage("Could not convert ");
-
-    createPdxInstance();
-  }
-
-  @Test
-  public void throwsExceptionIfMoreThanOneResultReturned() throws Exception {
-    setupResultSet(resultSet, FieldType.OBJECT);
-    when(resultSet.next()).thenReturn(true);
-    when(resultSet.getStatement()).thenReturn(mock(PreparedStatement.class));
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-    thrown.expect(JdbcConnectorException.class);
-    thrown.expectMessage("Multiple rows returned for query: ");
-
-    createPdxInstance();
-  }
-
-  private void setupResultSetForTwoObjectColumns(ResultSet result) throws SQLException {
-    setupResultSet(result, FieldType.OBJECT);
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(result.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(2);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
-    when(result.getObject(1)).thenReturn(COLUMN_VALUE_1);
-    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
-  }
-
-  private void setupResultSet(ResultSet result, FieldType fieldType) throws SQLException {
-    setupResultSet(result, fieldType, getValueByFieldType(fieldType));
-  }
-
-  private void setupResultSet(ResultSet result, FieldType fieldType, Object value)
-      throws SQLException {
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(result.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(1);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-
-    switch (fieldType) {
-      case STRING:
-        when(result.getString(1)).thenReturn((String) value);
-        break;
-      case CHAR:
-        Character charValue = (Character) value;
-        when(result.getString(1)).thenReturn(value == null ? null : charValue.toString());
-        break;
-      case SHORT:
-        when(result.getShort(1)).thenReturn(value == null ? 0 : (Short) value);
-        break;
-      case INT:
-        when(result.getInt(1)).thenReturn(value == null ? 0 : (Integer) value);
-        break;
-      case LONG:
-        when(result.getLong(1)).thenReturn(value == null ? 0 : (Long) value);
-        break;
-      case FLOAT:
-        when(result.getFloat(1)).thenReturn(value == null ? 0 : (Float) value);
-        break;
-      case DOUBLE:
-        when(result.getDouble(1)).thenReturn(value == null ? 0 : (Double) value);
-        break;
-      case BYTE:
-        when(result.getByte(1)).thenReturn(value == null ? 0 : (Byte) value);
-        break;
-      case BOOLEAN:
-        when(result.getBoolean(1)).thenReturn(value == null ? false : (Boolean) value);
-        break;
-      case DATE:
-        Date date = (Date) value;
-        java.sql.Timestamp sqlTimeStamp = null;
-        if (date != null) {
-          sqlTimeStamp = new java.sql.Timestamp(date.getTime());
-        }
-        when(result.getTimestamp(1)).thenReturn(sqlTimeStamp);
-        break;
-      case BYTE_ARRAY:
-        when(result.getBytes(1)).thenReturn((byte[]) value);
-        break;
-      case BOOLEAN_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case CHAR_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case SHORT_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case INT_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case LONG_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case FLOAT_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case DOUBLE_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case STRING_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case OBJECT_ARRAY:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case ARRAY_OF_BYTE_ARRAYS:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      case OBJECT:
-        when(result.getObject(1)).thenReturn(value);
-        break;
-      default:
-        throw new IllegalStateException("unhandled fieldType " + fieldType);
-    }
-
-  }
-
-  private static byte[][] arrayOfByteArray = new byte[][] {{1, 2}, {3, 4}};
-
-  @SuppressWarnings("unchecked")
-  private <T> T getValueByFieldType(FieldType fieldType) {
-    switch (fieldType) {
-      case STRING:
-        return (T) "stringValue";
-      case CHAR:
-        return (T) Character.valueOf('A');
-      case SHORT:
-        return (T) Short.valueOf((short) 36);
-      case INT:
-        return (T) Integer.valueOf(36);
-      case LONG:
-        return (T) Long.valueOf(36);
-      case FLOAT:
-        return (T) Float.valueOf(36);
-      case DOUBLE:
-        return (T) Double.valueOf(36);
-      case BYTE:
-        return (T) Byte.valueOf((byte) 36);
-      case BOOLEAN:
-        return (T) Boolean.TRUE;
-      case DATE:
-        return (T) new Date(1000);
-      case BYTE_ARRAY:
-        return (T) new byte[] {1, 2};
-      case BOOLEAN_ARRAY:
-        return (T) new boolean[] {true, false};
-      case CHAR_ARRAY:
-        return (T) new char[] {1, 2};
-      case SHORT_ARRAY:
-        return (T) new short[] {1, 2};
-      case INT_ARRAY:
-        return (T) new int[] {1, 2};
-      case LONG_ARRAY:
-        return (T) new long[] {1, 2};
-      case FLOAT_ARRAY:
-        return (T) new float[] {1, 2};
-      case DOUBLE_ARRAY:
-        return (T) new double[] {1, 2};
-      case STRING_ARRAY:
-        return (T) new String[] {"1", "2"};
-      case OBJECT_ARRAY:
-        return (T) new Object[] {1, 2};
-      case ARRAY_OF_BYTE_ARRAYS:
-        return (T) arrayOfByteArray;
-      case OBJECT:
-        return (T) "objectValue";
-      default:
-        throw new IllegalStateException("unhandled fieldType " + fieldType);
-    }
-  }
-
   private PdxInstanceFactory setupPdxInstanceFactory(FieldType fieldType) {
     PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    String pdxClassName = "myPdxClassName";
-    when(cache.createPdxInstanceFactory(pdxClassName)).thenReturn(factory);
-
-    when(regionMapping.getPdxName()).thenReturn(pdxClassName);
-    when(columnMapping.getPdxType()).thenReturn(fieldType.name());
+    when(factory.create()).thenReturn(pdxTemplate);
+    when(cache.createPdxInstanceFactory(PDX_CLASS_NAME)).thenReturn(factory);
 
+    when(regionMapping.getPdxName()).thenReturn(PDX_CLASS_NAME);
+    if (fieldType != null) {
+      when(columnMapping.getPdxType()).thenReturn(fieldType.name());
+    }
     return factory;
   }
 
   private void verifyPdxFactoryWrite(PdxInstanceFactory factory, FieldType fieldType) {
-    verifyPdxFactoryWrite(factory, fieldType, getValueByFieldType(fieldType));
-  }
-
-  private void verifyPdxFactoryWrite(PdxInstanceFactory factory, FieldType fieldType,
-      Object value) {
     switch (fieldType) {
       case STRING:
-        verify(factory).writeString(PDX_FIELD_NAME_1, (String) value);
+        verify(factory).writeString(PDX_FIELD_NAME_1, null);
         break;
       case CHAR:
-        verify(factory).writeChar(PDX_FIELD_NAME_1, value == null ? 0 : (char) value);
+        verify(factory).writeChar(PDX_FIELD_NAME_1, (char) 0);
         break;
       case SHORT:
-        verify(factory).writeShort(PDX_FIELD_NAME_1, value == null ? 0 : (short) value);
+        verify(factory).writeShort(PDX_FIELD_NAME_1, (short) 0);
         break;
       case INT:
-        verify(factory).writeInt(PDX_FIELD_NAME_1, value == null ? 0 : (int) value);
+        verify(factory).writeInt(PDX_FIELD_NAME_1, 0);
         break;
       case LONG:
-        verify(factory).writeLong(PDX_FIELD_NAME_1, value == null ? 0 : (long) value);
+        verify(factory).writeLong(PDX_FIELD_NAME_1, 0);
         break;
       case FLOAT:
-        verify(factory).writeFloat(PDX_FIELD_NAME_1, value == null ? 0 : (float) value);
+        verify(factory).writeFloat(PDX_FIELD_NAME_1, 0);
         break;
       case DOUBLE:
-        verify(factory).writeDouble(PDX_FIELD_NAME_1, value == null ? 0 : (double) value);
+        verify(factory).writeDouble(PDX_FIELD_NAME_1, 0);
         break;
       case BYTE:
-        verify(factory).writeByte(PDX_FIELD_NAME_1, value == null ? 0 : (byte) value);
+        verify(factory).writeByte(PDX_FIELD_NAME_1, (byte) 0);
         break;
       case BOOLEAN:
-        verify(factory).writeBoolean(PDX_FIELD_NAME_1, value == null ? false : (boolean) value);
+        verify(factory).writeBoolean(PDX_FIELD_NAME_1, false);
         break;
       case DATE:
-        verify(factory).writeDate(PDX_FIELD_NAME_1, (Date) value);
+        verify(factory).writeDate(PDX_FIELD_NAME_1, null);
         break;
       case BYTE_ARRAY:
-        verify(factory).writeByteArray(PDX_FIELD_NAME_1, (byte[]) value);
+        verify(factory).writeByteArray(PDX_FIELD_NAME_1, null);
         break;
       case BOOLEAN_ARRAY:
-        verify(factory).writeBooleanArray(PDX_FIELD_NAME_1, (boolean[]) value);
+        verify(factory).writeBooleanArray(PDX_FIELD_NAME_1, null);
         break;
       case CHAR_ARRAY:
-        verify(factory).writeCharArray(PDX_FIELD_NAME_1, (char[]) value);
+        verify(factory).writeCharArray(PDX_FIELD_NAME_1, null);
         break;
       case SHORT_ARRAY:
-        verify(factory).writeShortArray(PDX_FIELD_NAME_1, (short[]) value);
+        verify(factory).writeShortArray(PDX_FIELD_NAME_1, null);
         break;
       case INT_ARRAY:
-        verify(factory).writeIntArray(PDX_FIELD_NAME_1, (int[]) value);
+        verify(factory).writeIntArray(PDX_FIELD_NAME_1, null);
         break;
       case LONG_ARRAY:
-        verify(factory).writeLongArray(PDX_FIELD_NAME_1, (long[]) value);
+        verify(factory).writeLongArray(PDX_FIELD_NAME_1, null);
         break;
       case FLOAT_ARRAY:
-        verify(factory).writeFloatArray(PDX_FIELD_NAME_1, (float[]) value);
+        verify(factory).writeFloatArray(PDX_FIELD_NAME_1, null);
         break;
       case DOUBLE_ARRAY:
-        verify(factory).writeDoubleArray(PDX_FIELD_NAME_1, (double[]) value);
+        verify(factory).writeDoubleArray(PDX_FIELD_NAME_1, null);
         break;
       case STRING_ARRAY:
-        verify(factory).writeStringArray(PDX_FIELD_NAME_1, (String[]) value);
+        verify(factory).writeStringArray(PDX_FIELD_NAME_1, null);
         break;
       case OBJECT_ARRAY:
-        verify(factory).writeObjectArray(PDX_FIELD_NAME_1, (Object[]) value);
+        verify(factory).writeObjectArray(PDX_FIELD_NAME_1, null);
         break;
       case ARRAY_OF_BYTE_ARRAYS:
-        verify(factory).writeArrayOfByteArrays(PDX_FIELD_NAME_1, (byte[][]) value);
+        verify(factory).writeArrayOfByteArrays(PDX_FIELD_NAME_1, null);
         break;
       case OBJECT:
-        verify(factory).writeObject(PDX_FIELD_NAME_1, value);
+        verify(factory).writeObject(PDX_FIELD_NAME_1, null);
         break;
       default:
         throw new IllegalStateException("unhandled fieldType " + fieldType);
     }
   }
 
-  private void setupResultSetForObject(ResultSet result, Object objectToReturn)
-      throws SQLException {
-    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
-    when(result.getMetaData()).thenReturn(metaData);
-    when(metaData.getColumnCount()).thenReturn(2);
-    when(result.getObject(1)).thenReturn(objectToReturn);
-    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
-
-    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
-    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
-  }
-
   @Test
   public void computeFieldTypeTest() {
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.BOOLEAN))
@@ -846,25 +270,25 @@ public class SqlToPdxInstanceCreatorTest {
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.DATE))
         .isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.DATE))
-        .isEqualTo(FieldType.OBJECT);
+        .isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.TIME))
         .isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.TIME))
-        .isEqualTo(FieldType.OBJECT);
+        .isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.TIMESTAMP))
         .isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.TIMESTAMP))
-        .isEqualTo(FieldType.OBJECT);
+        .isEqualTo(FieldType.DATE);
     assertThat(
         SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.TIME_WITH_TIMEZONE))
             .isEqualTo(FieldType.DATE);
     assertThat(
         SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.TIME_WITH_TIMEZONE))
-            .isEqualTo(FieldType.OBJECT);
+            .isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false,
         JDBCType.TIMESTAMP_WITH_TIMEZONE)).isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(true,
-        JDBCType.TIMESTAMP_WITH_TIMEZONE)).isEqualTo(FieldType.OBJECT);
+        JDBCType.TIMESTAMP_WITH_TIMEZONE)).isEqualTo(FieldType.DATE);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.CHAR))
         .isEqualTo(FieldType.STRING);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.VARCHAR))
@@ -889,6 +313,30 @@ public class SqlToPdxInstanceCreatorTest {
             .isEqualTo(FieldType.BYTE_ARRAY);
     assertThat(SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.ROWID))
         .isEqualTo(FieldType.OBJECT);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.CHAR))
+        .isEqualTo(FieldType.STRING);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.VARCHAR))
+        .isEqualTo(FieldType.STRING);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.LONGVARCHAR))
+        .isEqualTo(FieldType.STRING);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.NCHAR))
+        .isEqualTo(FieldType.STRING);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.NVARCHAR))
+        .isEqualTo(FieldType.STRING);
+    assertThat(
+        SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.LONGNVARCHAR))
+            .isEqualTo(FieldType.STRING);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.BLOB))
+        .isEqualTo(FieldType.BYTE_ARRAY);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.BINARY))
+        .isEqualTo(FieldType.BYTE_ARRAY);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.VARBINARY))
+        .isEqualTo(FieldType.BYTE_ARRAY);
+    assertThat(
+        SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.LONGVARBINARY))
+            .isEqualTo(FieldType.BYTE_ARRAY);
+    assertThat(SqlToPdxInstanceCreator.computeFieldType(true, JDBCType.ROWID))
+        .isEqualTo(FieldType.OBJECT);
     Throwable throwable = catchThrowable(
         () -> SqlToPdxInstanceCreator.computeFieldType(false, JDBCType.NULL));
     assertThat(throwable).isInstanceOf(IllegalStateException.class);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceTest.java
similarity index 99%
copy from geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
copy to geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceTest.java
index d4b7214..9430ea6 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceTest.java
@@ -50,7 +50,7 @@ import org.apache.geode.pdx.internal.PdxField;
 import org.apache.geode.pdx.internal.TypeRegistry;
 
 @RunWith(JUnitParamsRunner.class)
-public class SqlToPdxInstanceCreatorTest {
+public class SqlToPdxInstanceTest {
 
   private static final String COLUMN_NAME_1 = "columnName1";
   private static final Object COLUMN_VALUE_1 = "columnValue1";
@@ -264,9 +264,7 @@ public class SqlToPdxInstanceCreatorTest {
   }
 
   private PdxInstance createPdxInstance() throws SQLException {
-    SqlToPdxInstanceCreator sqlToPdxInstanceCreator =
-        new SqlToPdxInstanceCreator(cache, regionMapping, resultSet);
-    return sqlToPdxInstanceCreator.create();
+    return new SqlToPdxInstance().create(resultSet);
   }
 
   @Test