You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nr...@apache.org on 2017/11/10 23:04:41 UTC

[geode] 02/02: Rearchitect JDBC connector for easier testing Add new system for handling configuration Expand test coverage for connector

This is an automated email from the ASF dual-hosted git repository.

nreich pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c5325c118e5a5c944cdd0490e6988e8bbd9f0873
Author: Nick Reich <nr...@pivotal.io>
AuthorDate: Fri Nov 10 14:59:33 2017 -0800

    Rearchitect JDBC connector for easier testing
    Add new system for handling configuration
    Expand test coverage for connector
---
 .../geode/connectors/jdbc/JDBCAsyncWriter.java     |   19 +-
 .../apache/geode/connectors/jdbc/JDBCLoader.java   |   18 +-
 .../connectors/jdbc/JDBCSynchronousWriter.java     |   21 +-
 .../connectors/jdbc/internal/ColumnValue.java      |   14 +-
 .../jdbc/internal/JDBCConfiguration.java           |  226 ----
 .../jdbc/internal/JDBCConfigurationService.java    |   40 +
 .../jdbc/internal/JDBCConnectionConfiguration.java |   54 +
 .../connectors/jdbc/internal/JDBCManager.java      |  381 ++-----
 .../jdbc/internal/JDBCPropertyParser.java          |   47 -
 .../jdbc/internal/JDBCRegionMapping.java           |   75 ++
 .../jdbc/internal/PreparedStatementCache.java      |  103 ++
 .../geode/connectors/jdbc/internal/SQLHandler.java |  182 ++++
 .../jdbc/internal/SqlStatementFactory.java         |   77 ++
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       |   11 +-
 .../connectors/jdbc/JDBCLoaderIntegrationTest.java |    7 +-
 .../jdbc/JDBCSynchronousWriterIntegrationTest.java |    8 +-
 .../connectors/jdbc/internal/ColumnValueTest.java  |   52 +
 .../internal/JDBCConfigurationServiceTest.java     |   73 ++
 .../jdbc/internal/JDBCConfigurationUnitTest.java   |  189 ----
 .../internal/JDBCConnectionConfigurationTest.java  |   58 +
 .../jdbc/internal/JDBCManagerUnitTest.java         | 1127 +++++++++-----------
 .../jdbc/internal/JDBCPropertyParserTest.java      |   64 --
 .../jdbc/internal/JDBCRegionMappingTest.java       |   80 ++
 .../jdbc/internal/PreparedStatementCacheTest.java  |   79 ++
 .../connectors/jdbc/internal/SQLHandlerTest.java   |  361 +++++++
 .../jdbc/internal/SqlStatementFactoryTest.java     |   76 ++
 .../jdbc/internal/TestConfigService.java           |   45 +
 27 files changed, 2004 insertions(+), 1483 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
index 771aec1..1050e92 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
@@ -18,13 +18,11 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.geode.CopyHelper;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.SerializedCacheValue;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
 import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.SQLHandler;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.logging.log4j.Logger;
@@ -39,6 +37,13 @@ public class JDBCAsyncWriter implements AsyncEventListener {
   private long totalEvents = 0;
   private long successfulEvents = 0;
   private JDBCManager manager;
+  private SQLHandler sqlHandler;
+
+  // Constructor for test purposes only
+  JDBCAsyncWriter(JDBCManager manager) {
+    this.manager = manager;
+    sqlHandler = new SQLHandler(manager);
+  }
 
   @Override
   public void close() {
@@ -69,7 +74,7 @@ public class JDBCAsyncWriter implements AsyncEventListener {
     try {
       for (AsyncEvent event : events) {
         try {
-          this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+          sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
               getPdxInstance(event));
           changeSuccessfulEvents(1);
         } catch (RuntimeException ex) {
@@ -85,8 +90,10 @@ public class JDBCAsyncWriter implements AsyncEventListener {
 
   @Override
   public void init(Properties props) {
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    this.manager = new JDBCManager(config);
+    /*
+     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
+     * JDBCManager(config);
+     */
   }
 
   private synchronized void changeTotalEvents(long delta) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java
index c6247bd..085272c 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java
@@ -15,13 +15,13 @@
 package org.apache.geode.connectors.jdbc;
 
 import org.apache.geode.cache.LoaderHelper;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
 import org.apache.geode.connectors.jdbc.internal.JDBCManager;
 
 import java.util.Properties;
 
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.connectors.jdbc.internal.SQLHandler;
 
 /*
  * This class provides loading from a data source using JDBC.
@@ -30,6 +30,13 @@ import org.apache.geode.cache.CacheLoaderException;
  */
 public class JDBCLoader<K, V> implements CacheLoader<K, V> {
   private JDBCManager manager;
+  private SQLHandler sqlHandler;
+
+  // Constructor for test purposes only
+  JDBCLoader(JDBCManager manager) {
+    this.manager = manager;
+    sqlHandler = new SQLHandler(manager);
+  }
 
   @Override
   public void close() {
@@ -47,11 +54,14 @@ public class JDBCLoader<K, V> implements CacheLoader<K, V> {
   public V load(LoaderHelper<K, V> helper) throws CacheLoaderException {
     // The following cast to V is to keep the compiler happy
     // but is erased at runtime and no actual cast happens.
-    return (V) this.manager.read(helper.getRegion(), helper.getKey());
+    return (V) sqlHandler.read(helper.getRegion(), helper.getKey());
   }
 
   public void init(Properties props) {
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    this.manager = new JDBCManager(config);
+    /*
+     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
+     * JDBCManager(config);
+     */
+    // TODO: make this get the ConfigurationService?
   };
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java
index ad741f0..a77c95b 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java
@@ -23,8 +23,8 @@ import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.cache.SerializedCacheValue;
 import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
 import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.SQLHandler;
 import org.apache.geode.pdx.PdxInstance;
 
 /*
@@ -34,11 +34,20 @@ import org.apache.geode.pdx.PdxInstance;
  */
 public class JDBCSynchronousWriter<K, V> implements CacheWriter<K, V> {
   private JDBCManager manager;
+  private SQLHandler sqlHandler;
+
+  // Constructor for test purposes only
+  JDBCSynchronousWriter(JDBCManager manager) {
+    this.manager = manager;
+    sqlHandler = new SQLHandler(manager);
+  }
 
   @Override
   public void init(Properties props) {
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    this.manager = new JDBCManager(config);
+    /*
+     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
+     * JDBCManager(config);
+     */
   }
 
   @Override
@@ -69,19 +78,19 @@ public class JDBCSynchronousWriter<K, V> implements CacheWriter<K, V> {
 
   @Override
   public void beforeUpdate(EntryEvent<K, V> event) throws CacheWriterException {
-    this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
   @Override
   public void beforeCreate(EntryEvent<K, V> event) throws CacheWriterException {
-    this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
   @Override
   public void beforeDestroy(EntryEvent<K, V> event) throws CacheWriterException {
-    this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
index 1e7803e..8ade14b 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
@@ -19,22 +19,22 @@ public class ColumnValue {
   final private String columnName;
   final private Object value;
 
-  public ColumnValue(boolean isKey, String columnName, Object value) {
+  ColumnValue(boolean isKey, String columnName, Object value) {
     this.isKey = isKey;
     this.columnName = columnName;
     this.value = value;
   }
 
-  public boolean isKey() {
-    return this.isKey;
+  boolean isKey() {
+    return isKey;
   }
 
-  public String getColumnName() {
-    return this.columnName;
+  String getColumnName() {
+    return columnName;
   }
 
-  public Object getValue() {
-    return this.value;
+  Object getValue() {
+    return value;
   }
 
   @Override
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
deleted file mode 100644
index 9e14112..0000000
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Function;
-
-public class JDBCConfiguration {
-  private static final boolean DEFAULT_KEY_IN_VALUE = false; //TODO: determine what default is
-  private static final String URL = "url";
-  private static final String USER = "user";
-  private static final String PASSWORD = "password";
-  private static final String JDBC_SEPARATOR = System.getProperty("jdbcSeparator", ":");
-  /**
-   * syntax: comma separated list of booleanSpecs. booleanSpec: optional regionSpec followed by
-   * boolean. regionSpec: regionName followed by a jdbcSeparator. A 'boolean' is parsed by
-   * {@link Boolean#parseBoolean(String)}. Whitespace is only allowed around the commas. At most one
-   * classSpec without a regionSpec is allowed. A classSpec without a regionSpec defines the
-   * default. Only used by JDBCLoader.
-   */
-  private static final String IS_KEY_PART_OF_VALUE = "isKeyPartOfValue";
-
-  /**
-   * syntax: comma separated list of classSpecs. classSpec: optional regionSpec followed by
-   * className. regionSpec: regionName followed by a jdbcSeparator. Whitespace is only allowed
-   * around the commas. At most one classSpec without a regionSpec is allowed. A classSpec without a
-   * regionSpec defines the default. Only used by JDBCLoader.
-   */
-  private static final String VALUE_CLASS_NAME = "valueClassName";
-
-  /**
-   * syntax: comma separated list of regionTableSpecs. regionTableSpecs: regionName followed by
-   * jdbcSeparator followed by tableName. Whitespace is only allowed around the commas.
-   */
-  private static final String REGION_TO_TABLE = "regionToTable";
-
-  /**
-   * syntax: comma separated list of fieldColumnSpecs. fieldColumnSpecs: Optional regionSpec
-   * followed by fieldName followed by jdbcSeparator followed by columnName. regionSpec: regionName
-   * followed by jdbcSeparator. Whitespace is only allowed around the commas.
-   */
-  private static final String FIELD_TO_COLUMN = "fieldToColumn";
-
-  private static final List<String> requiredProperties =
-      Collections.unmodifiableList(Arrays.asList(URL));
-
-  private final String url;
-  private final String user;
-  private final String password;
-  private final Map<String, String> regionToClassMap;
-  private final Map<String, Boolean> keyPartOfValueMap;
-  private final Map<String, String> regionToTableMap;
-  private final Map<RegionField, String> fieldToColumnMap;
-
-  public JDBCConfiguration(Properties configProps) {
-    validateRequiredProperties(configProps);
-    this.url = configProps.getProperty(URL);
-    this.user = configProps.getProperty(USER);
-    this.password = configProps.getProperty(PASSWORD);
-    JDBCPropertyParser parser = new JDBCPropertyParser(configProps);
-    this.regionToClassMap = parser.getPropertiesMap(VALUE_CLASS_NAME, v -> v);
-    this.keyPartOfValueMap = parser.getPropertiesMap(IS_KEY_PART_OF_VALUE, Boolean::parseBoolean);
-    this.regionToTableMap = parser.getPropertiesMap(REGION_TO_TABLE, v -> v);
-    this.fieldToColumnMap = computeFieldToColumnMap(configProps.getProperty(FIELD_TO_COLUMN));
-  }
-
-  private Map<RegionField, String> computeFieldToColumnMap(String prop) {
-    Function<String, RegionField> regionFieldParser = new Function<String, RegionField>() {
-      @Override
-      public RegionField apply(String item) {
-        String regionName = null;
-        String fieldName;
-        int idx = item.indexOf(getjdbcSeparator());
-        if (idx != -1) {
-          regionName = item.substring(0, idx);
-          fieldName = item.substring(idx + getjdbcSeparator().length());
-        } else {
-          fieldName = item;
-        }
-        return new RegionField(regionName, fieldName);
-      }
-    };
-    return parseMap(prop, regionFieldParser, v -> v, true);
-  }
-
-  private <K, V> Map<K, V> parseMap(String propertyValue, Function<String, K> keyParser,
-      Function<String, V> valueParser, boolean failOnNoSeparator) {
-    if (propertyValue == null) {
-      return null;
-    }
-    Map<K, V> result = new HashMap<>();
-    List<String> items = Arrays.asList(propertyValue.split("\\s*,\\s*"));
-    for (String item : items) {
-      int idx = item.lastIndexOf(getjdbcSeparator());
-      if (idx == -1) {
-        if (failOnNoSeparator) {
-          throw new IllegalArgumentException(item + " does not contain " + getjdbcSeparator());
-        }
-        continue;
-      }
-      String keyString = item.substring(0, idx);
-      String valueString = item.substring(idx + getjdbcSeparator().length());
-      K key = keyParser.apply(keyString);
-      if (result.containsKey(key)) {
-        throw new IllegalArgumentException("Duplicate item " + key + " is not allowed.");
-      }
-      result.put(key, valueParser.apply(valueString));
-    }
-    return result;
-  }
-
-  private void validateRequiredProperties(Properties configProps) {
-    List<String> reqKeys = new ArrayList<>(requiredProperties);
-    reqKeys.removeAll(configProps.stringPropertyNames());
-    if (!reqKeys.isEmpty()) {
-      Collections.sort(reqKeys);
-      throw new IllegalArgumentException("missing required properties: " + reqKeys);
-    }
-  }
-
-  String getURL() {
-    return this.url;
-  }
-
-  String getUser() {
-    return this.user;
-  }
-
-  String getPassword() {
-    return this.password;
-  }
-
-  String getValueClassName(String regionName) {
-    return regionToClassMap.get(regionName);
-  }
-
-  boolean getIsKeyPartOfValue(String regionName) {
-    Boolean result = this.keyPartOfValueMap.get(regionName);
-    return result != null ? result : DEFAULT_KEY_IN_VALUE;
-  }
-
-  String getjdbcSeparator() {
-    return JDBC_SEPARATOR;
-  }
-
-  String getTableForRegion(String regionName) {
-    if (this.regionToTableMap == null) {
-      return regionName;
-    }
-    String result = this.regionToTableMap.get(regionName);
-    if (result == null) {
-      result = regionName;
-    }
-    return result;
-  }
-
-  String getColumnForRegionField(String regionName, String fieldName) {
-    if (this.fieldToColumnMap == null) {
-      return fieldName;
-    }
-    RegionField key = new RegionField(regionName, fieldName);
-    String result = this.fieldToColumnMap.get(key);
-    if (result == null) {
-      key = new RegionField(null, fieldName);
-      result = this.fieldToColumnMap.get(key);
-      if (result == null) {
-        result = regionName;
-      }
-    }
-    return result;
-  }
-
-  public static class RegionField {
-    private final String regionName; // may be null
-    private final String fieldName;
-
-    public RegionField(String regionName, String fieldName) {
-      this.regionName = regionName;
-      this.fieldName = fieldName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      RegionField that = (RegionField) o;
-
-      if (regionName != null ? !regionName.equals(that.regionName) : that.regionName != null) {
-        return false;
-      }
-      return fieldName.equals(that.fieldName);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = regionName != null ? regionName.hashCode() : 0;
-      result = 31 * result + fieldName.hashCode();
-      return result;
-    }
-  }
-
-}
-
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationService.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationService.java
new file mode 100644
index 0000000..0509566
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationService.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class JDBCConfigurationService {
+
+  private final Map<String, JDBCConnectionConfiguration> connectionsByName =
+      new ConcurrentHashMap<>();
+  private final Map<String, JDBCRegionMapping> mappingsByRegion = new ConcurrentHashMap<>();
+
+  JDBCConnectionConfiguration getConnectionConfig(String connectionName) {
+    return connectionsByName.get(connectionName);
+  }
+
+  JDBCRegionMapping getMappingForRegion(String regionName) {
+    return mappingsByRegion.get(regionName);
+  }
+
+  void addOrUpdateConnectionConfig(JDBCConnectionConfiguration config) {
+    connectionsByName.put(config.getName(), config);
+  }
+
+  void addOrUpdateRegionMapping(JDBCRegionMapping mapping) {
+    mappingsByRegion.put(mapping.getRegionName(), mapping);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfiguration.java
new file mode 100644
index 0000000..c75aefe
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+class JDBCConnectionConfiguration {
+
+  private String name;
+  private String url;
+  private String user;
+  private String password;
+
+  String getName() {
+    return name;
+  }
+
+  void setName(String name) {
+    this.name = name;
+  }
+
+  String getUrl() {
+    return url;
+  }
+
+  void setUrl(String url) {
+    this.url = url;
+  }
+
+  String getUser() {
+    return user;
+  }
+
+  void setUser(String user) {
+    this.user = user;
+  }
+
+  String getPassword() {
+    return password;
+  }
+
+  void setPassword(String password) {
+    this.password = password;
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
index e1d7cad..be82e22 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
@@ -23,233 +23,34 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxInstanceFactory;
-import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class JDBCManager {
 
-  private final JDBCConfiguration config;
+  private final JDBCConfigurationService configService;
 
-  private Connection conn;
+  private Map<String, Connection> connectionMap = new ConcurrentHashMap<>();
 
   private final ConcurrentMap<String, String> tableToPrimaryKeyMap = new ConcurrentHashMap<>();
 
-  private final ThreadLocal<Map<StatementKey, PreparedStatement>> preparedStatementCache =
-      new ThreadLocal<>();
+  private final ThreadLocal<PreparedStatementCache> preparedStatementCache = new ThreadLocal<>();
 
-  private Map<StatementKey, PreparedStatement> getPreparedStatementCache() {
-    Map<StatementKey, PreparedStatement> result = preparedStatementCache.get();
-    if (result == null) {
-      result = new HashMap<>();
-      preparedStatementCache.set(result);
-    }
-    return result;
-  }
-
-  public JDBCManager(JDBCConfiguration config) {
-    this.config = config;
-  }
-
-  private String getRegionName(Region r) {
-    return r.getName();
-  }
-
-  public PdxInstance read(Region region, Object key) {
-    final String regionName = getRegionName(region);
-    final String tableName = getTableName(regionName);
-    List<ColumnValue> columnList =
-        getColumnToValueList(regionName, tableName, key, null, Operation.GET);
-    PreparedStatement pstmt = getPreparedStatement(columnList, tableName, Operation.GET, 0);
-    synchronized (pstmt) {
-      try {
-        int idx = 0;
-        for (ColumnValue cv : columnList) {
-          idx++;
-          pstmt.setObject(idx, cv.getValue());
-        }
-        ResultSet rs = pstmt.executeQuery();
-        if (rs.next()) {
-          InternalCache cache = (InternalCache) region.getRegionService();
-          String valueClassName = getValueClassName(regionName);
-          PdxInstanceFactory factory;
-          if (valueClassName != null) {
-            factory = cache.createPdxInstanceFactory(valueClassName);
-          } else {
-            factory = cache.createPdxInstanceFactory("no class", false);
-          }
-          ResultSetMetaData rsmd = rs.getMetaData();
-          int ColumnsNumber = rsmd.getColumnCount();
-          String keyColumnName = getKeyColumnName(tableName);
-          for (int i = 1; i <= ColumnsNumber; i++) {
-            Object columnValue = rs.getObject(i);
-            String columnName = rsmd.getColumnName(i);
-            String fieldName = mapColumnNameToFieldName(columnName, tableName);
-            if (!isFieldExcluded(fieldName)
-                && (isKeyPartOfValue(regionName) || !keyColumnName.equalsIgnoreCase(columnName))) {
-              factory.writeField(fieldName, columnValue, Object.class);
-            }
-          }
-          if (rs.next()) {
-            throw new IllegalStateException(
-                "Multiple rows returned for key " + key + " on table " + tableName);
-          }
-          return factory.create();
-        } else {
-          return null;
-        }
-      } catch (SQLException e) {
-        handleSQLException(e);
-        return null; // this is never reached
-      } finally {
-        clearStatementParameters(pstmt);
-      }
-    }
-  }
-
-  private String getValueClassName(String regionName) {
-    return this.config.getValueClassName(regionName);
-  }
-
-  public void write(Region region, Operation operation, Object key, PdxInstance value) {
-    final String regionName = getRegionName(region);
-    final String tableName = getTableName(regionName);
-    int pdxTypeId = 0;
-    if (value != null) {
-      pdxTypeId = ((PdxInstanceImpl) value).getPdxType().getTypeId();
-    }
-    List<ColumnValue> columnList =
-        getColumnToValueList(regionName, tableName, key, value, operation);
-    int updateCount = executeWrite(columnList, tableName, operation, pdxTypeId, false);
-    if (operation.isDestroy()) {
-      // TODO: should we check updateCount here? Probably not. It is possible we have nothing in the
-      // table to destroy.
-      return;
-    }
-    if (updateCount <= 0) {
-      Operation upsertOp;
-      if (operation.isUpdate()) {
-        upsertOp = Operation.CREATE;
-      } else {
-        upsertOp = Operation.UPDATE;
-      }
-      updateCount = executeWrite(columnList, tableName, upsertOp, pdxTypeId, true);
-    }
-    if (updateCount != 1) {
-      throw new IllegalStateException("Unexpected updateCount " + updateCount);
-    }
+  public JDBCManager(JDBCConfigurationService configService) {
+    this.configService = configService;
   }
 
-  private int executeWrite(List<ColumnValue> columnList, String tableName, Operation operation,
-      int pdxTypeId, boolean handleException) {
-    PreparedStatement pstmt = getPreparedStatement(columnList, tableName, operation, pdxTypeId);
-    synchronized (pstmt) {
-      try {
-        int idx = 0;
-        for (ColumnValue cv : columnList) {
-          idx++;
-          pstmt.setObject(idx, cv.getValue());
-        }
-        pstmt.execute();
-        return pstmt.getUpdateCount();
-      } catch (SQLException e) {
-        if (handleException || operation.isDestroy()) {
-          handleSQLException(e);
-        }
-        return 0;
-      } finally {
-        clearStatementParameters(pstmt);
-      }
-    }
+  JDBCRegionMapping getMappingForRegion(String regionName) {
+    return configService.getMappingForRegion(regionName);
   }
 
-  private void clearStatementParameters(PreparedStatement ps) {
-    try {
-      ps.clearParameters();
-    } catch (SQLException ignore) {
-    }
-  }
-
-  private String getSqlString(String tableName, List<ColumnValue> columnList, Operation operation) {
-    if (operation.isCreate()) {
-      return getInsertSqlString(tableName, columnList);
-    } else if (operation.isUpdate()) {
-      return getUpdateSqlString(tableName, columnList);
-    } else if (operation.isDestroy()) {
-      return getDestroySqlString(tableName, columnList);
-    } else if (operation.isGet()) {
-      return getSelectQueryString(tableName, columnList);
-    } else {
-      throw new IllegalStateException("unsupported operation " + operation);
-    }
-  }
-
-  private String getSelectQueryString(String tableName, List<ColumnValue> columnList) {
-    assert columnList.size() == 1;
-    ColumnValue keyCV = columnList.get(0);
-    assert keyCV.isKey();
-    StringBuilder query = new StringBuilder(
-        "SELECT * FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?");
-    return query.toString();
-  }
-
-  private String getDestroySqlString(String tableName, List<ColumnValue> columnList) {
-    assert columnList.size() == 1;
-    ColumnValue keyCV = columnList.get(0);
-    assert keyCV.isKey();
-    StringBuilder query =
-        new StringBuilder("DELETE FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?");
-    return query.toString();
-  }
-
-  private String getUpdateSqlString(String tableName, List<ColumnValue> columnList) {
-    StringBuilder query = new StringBuilder("UPDATE " + tableName + " SET ");
-    int idx = 0;
-    for (ColumnValue cv : columnList) {
-      if (cv.isKey()) {
-        query.append(" WHERE ");
-      } else {
-        idx++;
-        if (idx > 1) {
-          query.append(", ");
-        }
-      }
-      query.append(cv.getColumnName());
-      query.append(" = ?");
-    }
-    return query.toString();
-  }
-
-  private String getInsertSqlString(String tableName, List<ColumnValue> columnList) {
-    StringBuilder columnNames = new StringBuilder("INSERT INTO " + tableName + '(');
-    StringBuilder columnValues = new StringBuilder(" VALUES (");
-    int columnCount = columnList.size();
-    int idx = 0;
-    for (ColumnValue cv : columnList) {
-      idx++;
-      columnNames.append(cv.getColumnName());
-      columnValues.append('?');
-      if (idx != columnCount) {
-        columnNames.append(", ");
-        columnValues.append(",");
-      }
-    }
-    columnNames.append(")");
-    columnValues.append(")");
-    return columnNames.append(columnValues).toString();
-  }
-
-  Connection getConnection(String user, String password) {
-    Connection result = this.conn;
+  Connection getConnection(JDBCConnectionConfiguration config) {
+    Connection result = connectionMap.get(config.getName());
     try {
       if (result != null && !result.isClosed()) {
         return result;
@@ -257,84 +58,26 @@ public class JDBCManager {
     } catch (SQLException ignore) {
       // If isClosed throws fall through and connect again
     }
-
+    // TODO: make thread safe by synchronizing on map of connections
     try {
-      result =
-          createConnection(this.config.getURL(), this.config.getUser(), this.config.getPassword());
+      result = getSQLConnection(config);
     } catch (SQLException e) {
       // TODO: consider a different exception
-      throw new IllegalStateException("Could not connect to " + this.config.getURL(), e);
+      throw new IllegalStateException("Could not connect to " + config.getUrl(), e);
     }
-    this.conn = result;
+    connectionMap.put(config.getName(), result);
     return result;
   }
 
-  protected Connection createConnection(String url, String user, String password)
-      throws SQLException {
-    return DriverManager.getConnection(url);
+  // package protected for testing purposes only
+  Connection getSQLConnection(JDBCConnectionConfiguration config) throws SQLException {
+    return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
   }
 
-  private static class StatementKey {
-    private final int pdxTypeId;
-    private final Operation operation;
-    private final String tableName;
-
-    public StatementKey(int pdxTypeId, Operation operation, String tableName) {
-      this.pdxTypeId = pdxTypeId;
-      this.operation = operation;
-      this.tableName = tableName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      StatementKey that = (StatementKey) o;
-
-      if (pdxTypeId != that.pdxTypeId) {
-        return false;
-      }
-      if (operation != null ? !operation.equals(that.operation) : that.operation != null) {
-        return false;
-      }
-      return tableName != null ? tableName.equals(that.tableName) : that.tableName == null;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = pdxTypeId;
-      result = 31 * result + (operation != null ? operation.hashCode() : 0);
-      result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
-      return result;
-    }
-  }
-
-  private PreparedStatement getPreparedStatement(List<ColumnValue> columnList, String tableName,
-      Operation operation, int pdxTypeId) {
-    System.out.println("getPreparedStatement : " + pdxTypeId + "operation: " + operation
-        + " columns: " + columnList);
-    StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
-    return getPreparedStatementCache().computeIfAbsent(key, k -> {
-      String sqlStr = getSqlString(tableName, columnList, operation);
-      System.out.println("sql=" + sqlStr); // TODO remove debugging
-      Connection con = getConnection(null, null);
-      try {
-        return con.prepareStatement(sqlStr);
-      } catch (SQLException e) {
-        handleSQLException(e);
-        return null; // this line is never reached
-      }
-    });
-  }
-
-  private List<ColumnValue> getColumnToValueList(String regionName, String tableName, Object key,
-      PdxInstance value, Operation operation) {
-    String keyColumnName = getKeyColumnName(tableName);
+  // TODO write unit tests
+  List<ColumnValue> getColumnToValueList(JDBCConnectionConfiguration config,
+      JDBCRegionMapping regionMapping, Object key, PdxInstance value, Operation operation) {
+    String keyColumnName = getKeyColumnName(config, regionMapping.getTableName());
     ColumnValue keyCV = new ColumnValue(true, keyColumnName, key);
     if (operation.isDestroy() || operation.isGet()) {
       return Collections.singletonList(keyCV);
@@ -343,55 +86,40 @@ public class JDBCManager {
     List<String> fieldNames = value.getFieldNames();
     List<ColumnValue> result = new ArrayList<>(fieldNames.size() + 1);
     for (String fieldName : fieldNames) {
-      if (isFieldExcluded(fieldName)) {
-        continue;
-      }
-      String columnName = mapFieldNameToColumnName(regionName, fieldName);
+      String columnName = regionMapping.getColumnNameForField(fieldName);
       if (columnName.equalsIgnoreCase(keyColumnName)) {
         continue;
       }
       Object columnValue = value.getField(fieldName);
       ColumnValue cv = new ColumnValue(false, columnName, columnValue);
-      // TODO: any need to order the items in the list?
       result.add(cv);
     }
     result.add(keyCV);
     return result;
   }
 
-  private boolean isFieldExcluded(String fieldName) {
-    // TODO check configuration
-    return false;
-  }
-
-  private String mapFieldNameToColumnName(String regionName, String fieldName) {
-    return this.config.getColumnForRegionField(regionName, fieldName);
-  }
-
   private String mapColumnNameToFieldName(String columnName, String tableName) {
     // TODO check config for mapping
     return columnName.toLowerCase();
   }
 
-  private boolean isKeyPartOfValue(String regionName) {
-    return this.config.getIsKeyPartOfValue(regionName);
-  }
-
-  private String getKeyColumnName(String tableName) {
+  String getKeyColumnName(JDBCConnectionConfiguration connectionConfig, String tableName) {
     return tableToPrimaryKeyMap.computeIfAbsent(tableName, k -> {
-      return computeKeyColumnName(k);
+      return computeKeyColumnName(connectionConfig, k);
     });
   }
 
-  String computeKeyColumnName(String tableName) {
+  private String computeKeyColumnName(JDBCConnectionConfiguration connectionConfig,
+      String tableName) {
     // TODO: check config for key column
-    Connection con = getConnection(null, null);
+    Connection connection = getConnection(connectionConfig);
+    String key;
     try {
-      DatabaseMetaData metaData = con.getMetaData();
-      ResultSet tablesRS = metaData.getTables(null, null, "%", null);
+      DatabaseMetaData metaData = connection.getMetaData();
+      ResultSet tables = metaData.getTables(null, null, "%", null);
       String realTableName = null;
-      while (tablesRS.next()) {
-        String name = tablesRS.getString("TABLE_NAME");
+      while (tables.next()) {
+        String name = tables.getString("TABLE_NAME");
         if (name.equalsIgnoreCase(tableName)) {
           if (realTableName != null) {
             throw new IllegalStateException("Duplicate tables that match region name");
@@ -407,39 +135,35 @@ public class JDBCManager {
         throw new IllegalStateException(
             "The table " + tableName + " does not have a primary key column.");
       }
-      String key = primaryKeys.getString("COLUMN_NAME");
+      key = primaryKeys.getString("COLUMN_NAME");
       if (primaryKeys.next()) {
         throw new IllegalStateException(
             "The table " + tableName + " has more than one primary key column.");
       }
-      return key;
     } catch (SQLException e) {
       handleSQLException(e);
-      return null; // never reached
+      key = null; // never reached
     }
+    return key;
   }
 
   private void handleSQLException(SQLException e) {
     throw new IllegalStateException("NYI: handleSQLException", e);
   }
 
-  private String getTableName(String regionName) {
-    return config.getTableForRegion(regionName);
-  }
-
-  private void printResultSet(ResultSet rs) {
+  private void printResultSet(ResultSet resultSets) {
     System.out.println("Printing ResultSet:");
     try {
       int size = 0;
-      ResultSetMetaData rsmd = rs.getMetaData();
-      int columnsNumber = rsmd.getColumnCount();
-      while (rs.next()) {
+      ResultSetMetaData metaData = resultSets.getMetaData();
+      int columnsNumber = metaData.getColumnCount();
+      while (resultSets.next()) {
         size++;
         for (int i = 1; i <= columnsNumber; i++) {
           if (i > 1)
             System.out.print(",  ");
-          String columnValue = rs.getString(i);
-          System.out.print(rsmd.getColumnName(i) + ": " + columnValue);
+          String columnValue = resultSets.getString(i);
+          System.out.print(metaData.getColumnName(i) + ": " + columnValue);
         }
         System.out.println("");
       }
@@ -448,7 +172,7 @@ public class JDBCManager {
       System.out.println("Exception while printing result set" + ex);
     } finally {
       try {
-        rs.beforeFirst();
+        resultSets.beforeFirst();
       } catch (SQLException e) {
         System.out.println("Exception while calling beforeFirst" + e);
       }
@@ -456,11 +180,30 @@ public class JDBCManager {
   }
 
   public void close() {
-    if (this.conn != null) {
+    connectionMap.values().forEach(connection -> close(connection));
+  }
+
+  private void close(Connection connection) {
+    if (connection != null) {
       try {
-        this.conn.close();
+        connection.close();
       } catch (SQLException ignore) {
       }
     }
   }
+
+  JDBCConnectionConfiguration getConnectionConfig(String connectionConfigName) {
+    return configService.getConnectionConfig(connectionConfigName);
+  }
+
+  PreparedStatement getPreparedStatement(Connection connection, List<ColumnValue> columnList,
+      String tableName, Operation operation, int pdxTypeId) {
+    PreparedStatementCache statementCache = preparedStatementCache.get();
+    if (statementCache == null) {
+      statementCache = new PreparedStatementCache();
+      preparedStatementCache.set(statementCache);
+    }
+    return statementCache.getPreparedStatement(connection, columnList, tableName, operation,
+        pdxTypeId);
+  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java
deleted file mode 100644
index 167a079..0000000
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- *  * agreements. See the NOTICE file distributed with this work for additional information regarding
- *  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance with the License. You may obtain a
- *  * copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software distributed under the License
- *  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- *  * or implied. See the License for the specific language governing permissions and limitations under
- *  * the License.
- *
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Function;
-
-class JDBCPropertyParser {
-  private final Properties properties;
-
-  private static final String PROPERTY_PREFIX_SEPARATOR = "-";
-  private static final String COMPOUND_VALUE_SEPARATOR = ":";
-
-
-  JDBCPropertyParser(Properties properties) {
-    this.properties = properties;
-  }
-
-  <V> Map<String, V> getPropertiesMap(String propertyPrefix, Function<String, V> valueParser) {
-    Map<String, V> map = new HashMap<>();
-    for(String propertyName : properties.stringPropertyNames()) {
-      if (propertyName.startsWith(propertyPrefix)) {
-        V value = valueParser.apply(properties.getProperty(propertyName));
-        int prefixEnd = propertyName.indexOf(PROPERTY_PREFIX_SEPARATOR);
-        String key = propertyName.substring(prefixEnd + 1);
-        map.put(key, value);
-      }
-    }
-    return map;
-  }
-}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMapping.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMapping.java
new file mode 100644
index 0000000..ba8fe6f
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMapping.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class JDBCRegionMapping {
+  private String regionName;
+  private String pdxClassName;
+  private String tableName;
+  private String connectionConfigName;
+  private boolean primaryKeyInValue;
+  private Map<String, String> fieldToColumnMap = new HashMap<>();
+
+  void setConnectionConfigName(String connectionConfigName) {
+    this.connectionConfigName = connectionConfigName;
+  }
+
+  String getConnectionConfigName() {
+    return connectionConfigName;
+  }
+
+  String getRegionName() {
+    return regionName;
+  }
+
+  void setRegionName(String regionName) {
+    this.regionName = regionName;
+  }
+
+  String getPdxClassName() {
+    return pdxClassName;
+  }
+
+  void setPdxClassName(String pdxClassName) {
+    this.pdxClassName = pdxClassName;
+  }
+
+  String getTableName() {
+    return tableName;
+  }
+
+  void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  boolean isPrimaryKeyInValue() {
+    return primaryKeyInValue;
+  }
+
+  void setPrimaryKeyInValue(String primaryKeyInValue) {
+    this.primaryKeyInValue = Boolean.parseBoolean(primaryKeyInValue);
+  }
+
+  String getColumnNameForField(String fieldName) {
+    String columnName = fieldToColumnMap.get(fieldName);
+    return columnName != null ? columnName : fieldName;
+  }
+
+  void addFieldToColumnMapping(String fieldName, String columnMapping) {
+    fieldToColumnMap.put(fieldName, columnMapping);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java
new file mode 100644
index 0000000..7758a7e
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.geode.cache.Operation;
+
+class PreparedStatementCache {
+
+  private SqlStatementFactory statementFactory = new SqlStatementFactory();
+  // TODO: if connection lost, we will still keep the statement. Make LRU?
+  private Map<StatementKey, PreparedStatement> statements = new HashMap<>();
+
+  PreparedStatement getPreparedStatement(Connection connection, List<ColumnValue> columnList,
+      String tableName, Operation operation, int pdxTypeId) {
+    StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
+    return statements.computeIfAbsent(key, k -> {
+      String sqlStr = getSqlString(tableName, columnList, operation);
+      PreparedStatement statement = null;
+      try {
+        statement = connection.prepareStatement(sqlStr);
+      } catch (SQLException e) {
+        handleSQLException(e);
+      }
+      return statement;
+    });
+  }
+
+  private String getSqlString(String tableName, List<ColumnValue> columnList, Operation operation) {
+    if (operation.isCreate()) {
+      return statementFactory.createInsertSqlString(tableName, columnList);
+    } else if (operation.isUpdate()) {
+      return statementFactory.createUpdateSqlString(tableName, columnList);
+    } else if (operation.isDestroy()) {
+      return statementFactory.createDestroySqlString(tableName, columnList);
+    } else if (operation.isGet()) {
+      return statementFactory.createSelectQueryString(tableName, columnList);
+    } else {
+      throw new IllegalArgumentException("unsupported operation " + operation);
+    }
+  }
+
+  private void handleSQLException(SQLException e) {
+    throw new IllegalStateException("NYI: handleSQLException", e);
+  }
+
+  private static class StatementKey {
+    private final int pdxTypeId;
+    private final Operation operation;
+    private final String tableName;
+
+    StatementKey(int pdxTypeId, Operation operation, String tableName) {
+      this.pdxTypeId = pdxTypeId;
+      this.operation = operation;
+      this.tableName = tableName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      StatementKey that = (StatementKey) o;
+
+      if (pdxTypeId != that.pdxTypeId) {
+        return false;
+      }
+      if (operation != null ? !operation.equals(that.operation) : that.operation != null) {
+        return false;
+      }
+      return tableName != null ? tableName.equals(that.tableName) : that.tableName == null;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = pdxTypeId;
+      result = 31 * result + (operation != null ? operation.hashCode() : 0);
+      result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+      return result;
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SQLHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SQLHandler.java
new file mode 100644
index 0000000..a214968
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SQLHandler.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
+
+public class SQLHandler {
+  private JDBCManager manager;
+
+  public SQLHandler(JDBCManager manager) {
+    this.manager = manager;
+  }
+
+  public PdxInstance read(Region region, Object key) {
+    if (key == null) {
+      throw new IllegalArgumentException("Key for query cannot be null");
+    }
+
+    JDBCRegionMapping regionMapping = manager.getMappingForRegion(region.getName());
+    JDBCConnectionConfiguration connectionConfig =
+        manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+
+    List<ColumnValue> columnList =
+        manager.getColumnToValueList(connectionConfig, regionMapping, key, null, Operation.GET);
+    String tableName = regionMapping.getTableName();
+    PreparedStatement statement = manager.getPreparedStatement(
+        manager.getConnection(connectionConfig), columnList, tableName, Operation.GET, 0);
+    PdxInstanceFactory factory = getPdxInstanceFactory(region, regionMapping);
+    String keyColumnName = manager.getKeyColumnName(connectionConfig, tableName);
+    return executeReadStatement(statement, columnList, factory, regionMapping, keyColumnName);
+  }
+
+  private PdxInstanceFactory getPdxInstanceFactory(Region region, JDBCRegionMapping regionMapping) {
+    InternalCache cache = (InternalCache) region.getRegionService();
+    String valueClassName = regionMapping.getPdxClassName();
+    PdxInstanceFactory factory;
+    if (valueClassName != null) {
+      factory = cache.createPdxInstanceFactory(valueClassName);
+    } else {
+      factory = cache.createPdxInstanceFactory("no class", false);
+    }
+    return factory;
+  }
+
+  private PdxInstance executeReadStatement(PreparedStatement statement,
+      List<ColumnValue> columnList, PdxInstanceFactory factory, JDBCRegionMapping regionMapping,
+      String keyColumnName) {
+    PdxInstance pdxInstance = null;
+    synchronized (statement) {
+      try {
+        int idx = 0;
+        for (ColumnValue columnValue : columnList) {
+          idx++;
+          statement.setObject(idx, columnValue.getValue());
+        }
+        ResultSet resultSet = statement.executeQuery();
+        if (resultSet.next()) {
+
+          ResultSetMetaData metaData = resultSet.getMetaData();
+          int ColumnsNumber = metaData.getColumnCount();
+          for (int i = 1; i <= ColumnsNumber; i++) {
+            Object columnValue = resultSet.getObject(i);
+            String columnName = metaData.getColumnName(i);
+            String fieldName = mapColumnNameToFieldName(columnName, regionMapping.getTableName());
+            if (regionMapping.isPrimaryKeyInValue()
+                || !keyColumnName.equalsIgnoreCase(columnName)) {
+              factory.writeField(fieldName, columnValue, Object.class);
+            }
+          }
+          if (resultSet.next()) {
+            throw new IllegalStateException(
+                "Multiple rows returned for query: " + resultSet.getStatement().toString());
+          }
+          pdxInstance = factory.create();
+        }
+      } catch (SQLException e) {
+        handleSQLException(e);
+      } finally {
+        clearStatementParameters(statement);
+      }
+    }
+    return pdxInstance;
+  }
+
+  private String mapColumnNameToFieldName(String columnName, String tableName) {
+    // TODO check config for mapping
+    return columnName.toLowerCase();
+  }
+
+  public void write(Region region, Operation operation, Object key, PdxInstance value) {
+    if (value == null && operation != Operation.DESTROY) {
+      throw new IllegalArgumentException("PdxInstance cannot be null for non-destroy operations");
+    }
+    JDBCRegionMapping regionMapping = manager.getMappingForRegion(region.getName());
+    final String tableName = regionMapping.getTableName();
+    JDBCConnectionConfiguration connectionConfig =
+        manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+    List<ColumnValue> columnList =
+        manager.getColumnToValueList(connectionConfig, regionMapping, key, value, operation);
+
+    int pdxTypeId = 0;
+    if (value != null) {
+      pdxTypeId = ((PdxInstanceImpl) value).getPdxType().getTypeId();
+    }
+    PreparedStatement statement = manager.getPreparedStatement(
+        manager.getConnection(connectionConfig), columnList, tableName, operation, pdxTypeId);
+    int updateCount = executeWriteStatement(statement, columnList, operation, false);
+    if (operation.isDestroy()) {
+      // TODO: should we check updateCount here? Probably not. It is possible we have nothing in the
+      // table to destroy.
+      return;
+    }
+    if (updateCount <= 0) {
+      Operation upsertOp;
+      if (operation.isUpdate()) {
+        upsertOp = Operation.CREATE;
+      } else {
+        upsertOp = Operation.UPDATE;
+      }
+      PreparedStatement upsertStatement = manager.getPreparedStatement(
+          manager.getConnection(connectionConfig), columnList, tableName, upsertOp, pdxTypeId);
+      updateCount = executeWriteStatement(upsertStatement, columnList, upsertOp, true);
+    }
+    if (updateCount != 1) {
+      throw new IllegalStateException("Unexpected updateCount " + updateCount);
+    }
+  }
+
+  private int executeWriteStatement(PreparedStatement statement, List<ColumnValue> columnList,
+      Operation operation, boolean handleException) {
+    synchronized (statement) {
+      try {
+        int idx = 0;
+        for (ColumnValue columnValue : columnList) {
+          idx++;
+          statement.setObject(idx, columnValue.getValue());
+        }
+        return statement.executeUpdate();
+      } catch (SQLException e) {
+        if (handleException || operation.isDestroy()) {
+          handleSQLException(e);
+        }
+        return 0;
+      } finally {
+        clearStatementParameters(statement);
+      }
+    }
+  }
+
+  private void clearStatementParameters(PreparedStatement statement) {
+    try {
+      statement.clearParameters();
+    } catch (SQLException ignore) {
+    }
+  }
+
+  private void handleSQLException(SQLException e) {
+    throw new IllegalStateException("NYI: handleSQLException", e);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
new file mode 100644
index 0000000..1b0da56
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.List;
+
+class SqlStatementFactory {
+
+  String createSelectQueryString(String tableName, List<ColumnValue> columnList) {
+    assert columnList.size() == 1;
+    ColumnValue keyCV = columnList.get(0);
+    assert keyCV.isKey();
+    return "SELECT * FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+  }
+
+  String createDestroySqlString(String tableName, List<ColumnValue> columnList) {
+    assert columnList.size() == 1;
+    ColumnValue keyCV = columnList.get(0);
+    assert keyCV.isKey();
+    return "DELETE FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+  }
+
+  String createUpdateSqlString(String tableName, List<ColumnValue> columnList) {
+    StringBuilder query = new StringBuilder("UPDATE " + tableName + " SET ");
+    int idx = 0;
+    for (ColumnValue column : columnList) {
+      if (!column.isKey()) {
+        idx++;
+        if (idx > 1) {
+          query.append(", ");
+        }
+        query.append(column.getColumnName());
+        query.append(" = ?");
+      }
+    }
+    for (ColumnValue column : columnList) {
+      if (column.isKey()) {
+        query.append(" WHERE ");
+        query.append(column.getColumnName());
+        query.append(" = ?");
+        // currently only support simple primary key with one column
+        break;
+      }
+    }
+    return query.toString();
+  }
+
+  String createInsertSqlString(String tableName, List<ColumnValue> columnList) {
+    StringBuilder columnNames = new StringBuilder("INSERT INTO " + tableName + " (");
+    StringBuilder columnValues = new StringBuilder(" VALUES (");
+    int columnCount = columnList.size();
+    int idx = 0;
+    for (ColumnValue column : columnList) {
+      idx++;
+      columnNames.append(column.getColumnName());
+      columnValues.append('?');
+      if (idx != columnCount) {
+        columnNames.append(", ");
+        columnValues.append(",");
+      }
+    }
+    columnNames.append(")");
+    columnValues.append(")");
+    return columnNames.append(columnValues).toString();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
index 966cde5..1a228a6 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -29,6 +29,9 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.connectors.jdbc.JDBCAsyncWriter;
+import org.apache.geode.connectors.jdbc.internal.JDBCConfigurationService;
+import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxReader;
@@ -319,8 +322,8 @@ public class JDBCAsyncWriterIntegrationTest {
   }
 
   private Region createRegionWithJDBCAsyncWriter(String regionName, Properties props) {
-    jdbcWriter = new JDBCAsyncWriter();
-    jdbcWriter.init(props);
+    jdbcWriter = new JDBCAsyncWriter(createManager());
+    // jdbcWriter.init(props);
     cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
         .create("jdbcAsyncQueue", jdbcWriter);
 
@@ -351,4 +354,8 @@ public class JDBCAsyncWriterIntegrationTest {
     }
   }
 
+  private JDBCManager createManager() {
+    return new JDBCManager(TestConfigService.getTestConfigService());
+  }
+
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java
index 52314d0..ac9508c 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java
@@ -27,6 +27,8 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -90,7 +92,7 @@ public class JDBCLoaderIntegrationTest {
   }
 
   private Region createRegionWithJDBCLoader(String regionName, Properties props) {
-    this.jdbcLoader = new JDBCLoader<>();
+    this.jdbcLoader = new JDBCLoader<>(createManager());
     this.jdbcLoader.init(props);
     RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
     rf.setCacheLoader(jdbcLoader);
@@ -137,4 +139,7 @@ public class JDBCLoaderIntegrationTest {
     }
   }
 
+  private JDBCManager createManager() {
+    return new JDBCManager(TestConfigService.getTestConfigService());
+  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java
index fbdce89..a8162eb 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java
@@ -33,6 +33,8 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxReader;
@@ -318,7 +320,7 @@ public class JDBCSynchronousWriterIntegrationTest {
   }
 
   private Region createRegionWithJDBCSynchronousWriter(String regionName, Properties props) {
-    jdbcWriter = new JDBCSynchronousWriter();
+    jdbcWriter = new JDBCSynchronousWriter(createManager());
     jdbcWriter.init(props);
 
     RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
@@ -348,4 +350,8 @@ public class JDBCSynchronousWriterIntegrationTest {
     }
   }
 
+  private JDBCManager createManager() {
+    return new JDBCManager(TestConfigService.getTestConfigService());
+  }
+
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
new file mode 100644
index 0000000..6a75199
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ColumnValueTest {
+  private static final String COLUMN_NAME = "columnName";
+  private static final Object VALUE = new Object();
+
+  private ColumnValue value;
+
+  @Before
+  public void setup() {
+    value = new ColumnValue(true, COLUMN_NAME, VALUE);
+  }
+
+  @Test
+  public void isKeyReturnsCorrectValue() {
+    assertThat(value.isKey()).isTrue();
+  }
+
+  @Test
+  public void hasCorrectColumnName() {
+    assertThat(value.getColumnName()).isEqualTo(COLUMN_NAME);
+  }
+
+  @Test
+  public void hasCorrectValue() {
+    assertThat(value.getValue()).isSameAs(VALUE);
+  }
+
+  @Test
+  public void toStringContainsAllVariables() {
+    assertThat(value.toString()).contains(Boolean.toString(true)).contains(COLUMN_NAME)
+        .contains(VALUE.toString());
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationServiceTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationServiceTest.java
new file mode 100644
index 0000000..ca1df04
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationServiceTest.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+public class JDBCConfigurationServiceTest {
+  private static final String TEST_CONFIG_NAME = "testConfig";
+  private static final String TEST_REGION_NAME = "testRegion";
+
+  private JDBCConfigurationService service = new JDBCConfigurationService();
+
+  @Test
+  public void returnsNoConfigIfEmpty() {
+    assertThat(service.getConnectionConfig("foo")).isNull();
+  }
+
+  @Test
+  public void returnsNoMappingIfEmpty() {
+    assertThat(service.getMappingForRegion("foo")).isNull();
+  }
+
+  @Test
+  public void returnsCorrectConfig() {
+    JDBCConnectionConfiguration config = mock(JDBCConnectionConfiguration.class);
+    when(config.getName()).thenReturn(TEST_CONFIG_NAME);
+    service.addOrUpdateConnectionConfig(config);
+
+    assertThat(service.getConnectionConfig(TEST_CONFIG_NAME)).isSameAs(config);
+  }
+
+  @Test
+  public void doesNotReturnConfigWithDifferentName() {
+    JDBCConnectionConfiguration config = mock(JDBCConnectionConfiguration.class);
+    when(config.getName()).thenReturn("theOtherConfig");
+    service.addOrUpdateConnectionConfig(config);
+
+    assertThat(service.getConnectionConfig(TEST_CONFIG_NAME)).isNull();
+  }
+
+  @Test
+  public void returnsCorrectMapping() {
+    JDBCRegionMapping mapping = mock(JDBCRegionMapping.class);
+    when(mapping.getRegionName()).thenReturn(TEST_REGION_NAME);
+    service.addOrUpdateRegionMapping(mapping);
+
+    assertThat(service.getMappingForRegion(TEST_REGION_NAME)).isSameAs(mapping);
+  }
+
+  @Test
+  public void doesNotReturnMappingForDifferentRegion() {
+    JDBCRegionMapping mapping = mock(JDBCRegionMapping.class);
+    when(mapping.getRegionName()).thenReturn("theOtherMapping");
+    service.addOrUpdateRegionMapping(mapping);
+
+    assertThat(service.getMappingForRegion(TEST_REGION_NAME)).isNull();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
deleted file mode 100644
index 8e557f5..0000000
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import static org.assertj.core.api.Assertions.*;
-
-import java.util.Properties;
-
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
-import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.*;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-
-@Category(UnitTest.class)
-public class JDBCConfigurationUnitTest {
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testMissingAllRequiredProperties() {
-    Properties props = new Properties();
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("missing required properties: [url]");
-    new JDBCConfiguration(props);
-  }
-
-  @Test
-  public void testURLProperty() {
-    Properties props = new Properties();
-    props.setProperty("url", "myUrl");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getURL()).isEqualTo("myUrl");
-  }
-
-  @Test
-  public void testDefaultUser() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getUser()).isNull();
-  }
-
-  @Test
-  public void testDefaultPassword() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getPassword()).isNull();
-  }
-
-  @Test
-  public void testUser() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("user", "myUser");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getUser()).isEqualTo("myUser");
-  }
-
-  @Test
-  public void testPassword() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("password", "myPassword");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getPassword()).isEqualTo("myPassword");
-  }
-
-  @Test
-  public void testDefaultValueClassName() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getValueClassName("foo")).isNull();
-  }
-
-  @Test
-  public void testValueClassNameWithRegionNames() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("valueClassName-reg1", "cn1");
-    props.setProperty("valueClassName-reg2", "pack2.cn2");
-    props.setProperty("valueClassName-foo", "myPackage.myDomainClass");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getValueClassName("foo")).isEqualTo("myPackage.myDomainClass");
-    assertThat(config.getValueClassName("reg1")).isEqualTo("cn1");
-    assertThat(config.getValueClassName("reg2")).isEqualTo("pack2.cn2");
-  }
-
-  @Test
-  public void testDefaultIsKeyPartOfValue() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(false);
-  }
-
-  @Test
-  public void testIsKeyPartOfValueWithRegionNames() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("isKeyPartOfValue-reg1", "true");
-    props.setProperty("isKeyPartOfValue-reg2", "false");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(false);
-    assertThat(config.getIsKeyPartOfValue("reg1")).isEqualTo(true);
-    assertThat(config.getIsKeyPartOfValue("reg2")).isEqualTo(false);
-  }
-
-  @Test
-  public void testDefaultRegionToTableMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getTableForRegion("foo")).isEqualTo("foo");
-  }
-
-  @Test
-  public void testRegionToTableMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("regionToTable-reg1", "table1");
-    props.setProperty("regionToTable-reg2", "table2");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getTableForRegion("reg1")).isEqualTo("table1");
-    assertThat(config.getTableForRegion("reg2")).isEqualTo("table2");
-  }
-
-  @Test
-  public void testDefaultFieldToColumnMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getColumnForRegionField("reg1", "field1")).isEqualTo("field1");
-  }
-
-  @Test
-  public void testFieldToColumnMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn", "field1:column1");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getColumnForRegionField("reg1", "field1")).isEqualTo("column1");
-  }
-
-  @Test
-  public void testFieldToColumnMapWithMoreThanOne() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn",
-        "reg0:field2:othercolumn2, reg1:field1:column1, field2:column2, reg3:field1:othercolumn1");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getColumnForRegionField("reg1", "field1")).isEqualTo("column1");
-    assertThat(config.getColumnForRegionField("reg3", "field1")).isEqualTo("othercolumn1");
-    assertThat(config.getColumnForRegionField("reg0", "field2")).isEqualTo("othercolumn2");
-    assertThat(config.getColumnForRegionField("regAny", "field2")).isEqualTo("column2");
-    assertThat(config.getColumnForRegionField("regOther", "field2")).isEqualTo("column2");
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyDuplicateFieldThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn", "field1:column1, field1:column2");
-    new JDBCConfiguration(props);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyDuplicateRegionFieldThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn", "reg1:field1:column1, reg1:field1:column2");
-    new JDBCConfiguration(props);
-  }
-}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfigurationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfigurationTest.java
new file mode 100644
index 0000000..186ba13
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfigurationTest.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class JDBCConnectionConfigurationTest {
+  private JDBCConnectionConfiguration config = new JDBCConnectionConfiguration();
+
+  @Test
+  public void initiatedWithNullValues() {
+    assertThat(config.getName()).isNull();
+    assertThat(config.getUrl()).isNull();
+    assertThat(config.getUser()).isNull();
+    assertThat(config.getPassword()).isNull();
+  }
+
+  @Test
+  public void hasCorrectName() {
+    String name = "name";
+    config.setName(name);
+    assertThat(config.getName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectUrl() {
+    String url = "url";
+    config.setUrl(url);
+    assertThat(config.getUrl()).isEqualTo(url);
+  }
+
+  @Test
+  public void hasCorrectUser() {
+    String user = "user";
+    config.setUser(user);
+    assertThat(config.getUser()).isEqualTo(user);
+  }
+
+  @Test
+  public void hasCorrectPassword() {
+    String password = "password";
+    config.setPassword(password);
+    assertThat(config.getPassword()).isEqualTo(password);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
index 0e9d85b..f9a81e1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
@@ -14,652 +14,557 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.*;
-import static com.googlecode.catchexception.CatchException.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
 
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.ArgumentCaptor;
 
-import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.Region;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
-import org.apache.geode.connectors.jdbc.internal.JDBCManager;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxInstanceFactory;
-import org.apache.geode.pdx.internal.PdxInstanceImpl;
-import org.apache.geode.pdx.internal.PdxType;
-import org.apache.geode.test.fake.Fakes;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class JDBCManagerUnitTest {
 
-  private JDBCManager mgr;
-  private String regionName = "jdbcRegion";
-  Connection connection;
-  PreparedStatement preparedStatement;
-  PreparedStatement preparedStatement2;
-
-  private static final String ID_COLUMN_NAME = "ID";
-  private static final String NAME_COLUMN_NAME = "name";
-  private static final String AGE_COLUMN_NAME = "age";
-
-  public class TestableUpsertJDBCManager extends JDBCManager {
-
-    final int upsertReturn;
-
-    TestableUpsertJDBCManager(JDBCConfiguration config) {
-      super(config);
-      upsertReturn = 1;
-    }
-
-    TestableUpsertJDBCManager(JDBCConfiguration config, int upsertReturn) {
-      super(config);
-      this.upsertReturn = upsertReturn;
-    }
-
-    @Override
-    protected Connection createConnection(String url, String user, String password)
-        throws SQLException {
-      ResultSet rsKeys = mock(ResultSet.class);
-      when(rsKeys.next()).thenReturn(true, false);
-      when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
-
-      ResultSet rs = mock(ResultSet.class);
-      when(rs.next()).thenReturn(true, false);
-      when(rs.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
-
-      DatabaseMetaData metaData = mock(DatabaseMetaData.class);
-      when(metaData.getPrimaryKeys(null, null, regionName.toUpperCase())).thenReturn(rsKeys);
-      when(metaData.getTables(any(), any(), any(), any())).thenReturn(rs);
-
-      preparedStatement = mock(PreparedStatement.class);
-      preparedStatement2 = mock(PreparedStatement.class);
-      when(preparedStatement.getUpdateCount()).thenReturn(0);
-      when(preparedStatement2.getUpdateCount()).thenReturn(this.upsertReturn);
-
-      connection = mock(Connection.class);
-      when(connection.getMetaData()).thenReturn(metaData);
-      when(connection.prepareStatement(any())).thenReturn(preparedStatement, preparedStatement2);
-
-      return connection;
-    }
-  }
-
-  public class TestableJDBCManagerWithResultSets extends JDBCManager {
-
-    private ResultSet tableResults;
-    private ResultSet primaryKeyResults;
-    private ResultSet queryResultSet;
-    private String tableName;
-
-    TestableJDBCManagerWithResultSets(String tableName, JDBCConfiguration config,
-        ResultSet tableResults, ResultSet primaryKeyResults, ResultSet queryResultSet) {
-      super(config);
-      this.tableResults = tableResults;
-      this.primaryKeyResults = primaryKeyResults;
-      this.queryResultSet = queryResultSet;
-      this.tableName = tableName;
-    }
-
-    @Override
-    protected Connection createConnection(String url, String user, String password)
-        throws SQLException {
-      if (primaryKeyResults == null) {
-        primaryKeyResults = mock(ResultSet.class);
-        when(primaryKeyResults.next()).thenReturn(true, false);
-        when(primaryKeyResults.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
-      }
-
-      if (tableResults == null) {
-        tableResults = mock(ResultSet.class);
-        when(tableResults.next()).thenReturn(true, false);
-        when(tableResults.getString("TABLE_NAME")).thenReturn(this.tableName.toUpperCase());
-      }
+  private static final String REGION_NAME = "testRegion";
+  private static final String CONFIG_NAME = "configName";
 
-      DatabaseMetaData metaData = mock(DatabaseMetaData.class);
-      when(metaData.getPrimaryKeys(null, null, this.tableName.toUpperCase()))
-          .thenReturn(primaryKeyResults);
-      when(metaData.getTables(any(), any(), any(), any())).thenReturn(tableResults);
+  private JDBCConfigurationService configService;
+  private JDBCManager manager;
 
-      preparedStatement = mock(PreparedStatement.class);
-      when(preparedStatement.getUpdateCount()).thenReturn(1);
-
-
-      if (this.queryResultSet == null) {
-        queryResultSet = mock(ResultSet.class);
-        ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
-        when(rsmd.getColumnCount()).thenReturn(3);
-        when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
-        when(queryResultSet.getMetaData()).thenReturn(rsmd);
-        when(queryResultSet.next()).thenReturn(true, false);
-        when(queryResultSet.getObject(anyInt())).thenReturn("1", "Emp1", 21);
-      }
-
-      when(preparedStatement.executeQuery()).thenReturn(queryResultSet);
-
-      connection = mock(Connection.class);
-      when(connection.getMetaData()).thenReturn(metaData);
-      when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-
-      return connection;
-    }
-  }
 
   @Before
-  public void setUp() throws Exception {}
-
-  @After
-  public void tearDown() throws Exception {}
-
-  private void createManager(String... args) throws SQLException {
-    createManagerWithTableName(regionName, args);
-  }
-
-  private void createManagerWithTableName(String tableName, String... args) throws SQLException {
-    ResultSet rsKeys = mock(ResultSet.class);
-    when(rsKeys.next()).thenReturn(true, false);
-    when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
-
-    ResultSet rs = mock(ResultSet.class);
-    when(rs.next()).thenReturn(true, false);
-    when(rs.getString("TABLE_NAME")).thenReturn(tableName.toUpperCase());
-
-    this.mgr = new TestableJDBCManagerWithResultSets(tableName, createConfiguration(args), rs,
-        rsKeys, null);
-  }
-
-  private void createDefaultManager() throws SQLException {
-    createManager();
-  }
-
-  private void createUpsertManager() {
-    this.mgr = new TestableUpsertJDBCManager(createConfiguration());
-  }
-
-  private void createUpsertManager(int upsertReturn) {
-    this.mgr = new TestableUpsertJDBCManager(createConfiguration(), upsertReturn);
-  }
-
-  private void createManager(ResultSet tableNames, ResultSet primaryKeys, ResultSet queryResultSet,
-      String... args) {
-    this.mgr = new TestableJDBCManagerWithResultSets(regionName, createConfiguration(args),
-        tableNames, primaryKeys, queryResultSet);
-  }
-
-  private JDBCConfiguration createConfiguration(String... args) {
-    Properties props = new Properties();
-    props.setProperty("url", "fakeURL");
-    String[] argsArray = args;
-    if (argsArray != null) {
-      assert argsArray.length % 2 == 0;
-      for (int i = 0; i < argsArray.length; i += 2) {
-        props.setProperty(argsArray[i], argsArray[i + 1]);
-      }
-    }
-    return new JDBCConfiguration(props);
+  public void setup() {
+    configService = mock(JDBCConfigurationService.class);
+    manager = new JDBCManager(configService);
   }
 
   @Test
-  public void verifySimpleCreateCallsExecute() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
-        + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
+  public void getsCorrectMapping() {
+    manager.getMappingForRegion(REGION_NAME);
+    verify(configService).getMappingForRegion(REGION_NAME);
   }
 
   @Test
-  public void verifySimpleCreateWithIdField() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21, true);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
-        + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
+  public void getsCorrectConnectionConfig() {
+    manager.getConnectionConfig(CONFIG_NAME);
+    verify(configService).getConnectionConfig(CONFIG_NAME);
   }
 
   @Test
-  public void verifySimpleUpdateCallsExecute() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.UPDATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
+  public void retrievesANewConnection() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    Connection returnedConnection = spyManager.getConnection(connectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
   }
 
   @Test
-  public void verifySimpleDestroyCallsExecute() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    this.mgr.write(region, Operation.DESTROY, "1", null);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("DELETE FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
+  public void retrievesSameConnectionForSameConnectionConfig() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    Connection returnedConnection = spyManager.getConnection(connectionConfig);
+    Connection secondReturnedConnection = spyManager.getConnection(connectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
+    assertThat(secondReturnedConnection).isNotNull().isSameAs(connection);
   }
 
   @Test
-  public void verifyTwoCreatesReuseSameStatement() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    PdxInstanceImpl pdx2 = mockPdxInstance("Emp2", 55);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    this.mgr.write(region, Operation.CREATE, "2", pdx2);
-    verify(this.preparedStatement, times(2)).execute();
-    verify(this.connection).prepareStatement(any());
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(6)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    assertThat(allObjects.get(3)).isEqualTo("Emp2");
-    assertThat(allObjects.get(4)).isEqualTo(55);
-    assertThat(allObjects.get(5)).isEqualTo("2");
-  }
-
-  private PdxInstanceImpl mockPdxInstance(String name, int age) {
-    return mockPdxInstance(name, age, false);
-  }
-
-  private PdxInstanceImpl mockPdxInstance(String name, int age, boolean addId) {
-    PdxInstanceImpl pdxInstance = mock(PdxInstanceImpl.class);
-    if (addId) {
-      when(pdxInstance.getFieldNames())
-          .thenReturn(Arrays.asList(NAME_COLUMN_NAME, AGE_COLUMN_NAME, ID_COLUMN_NAME));
-      when(pdxInstance.getField(ID_COLUMN_NAME)).thenReturn("bogusId");
-    } else {
-      when(pdxInstance.getFieldNames())
-          .thenReturn(Arrays.asList(NAME_COLUMN_NAME, AGE_COLUMN_NAME));
-    }
-    when(pdxInstance.getField(NAME_COLUMN_NAME)).thenReturn(name);
-    when(pdxInstance.getField(AGE_COLUMN_NAME)).thenReturn(age);
-    PdxType pdxType = mock(PdxType.class);
-    when(pdxType.getTypeId()).thenReturn(1);
-    when(pdxInstance.getPdxType()).thenReturn(pdxType);
-    return pdxInstance;
+  public void retrievesDifferentConnectionForEachConfig() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    Connection secondConnection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    JDBCConnectionConfiguration secondConnectionConfig =
+        getTestConnectionConfig("newName", "url", null, null);
+
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    doReturn(secondConnection).when(spyManager).getSQLConnection(secondConnectionConfig);
+    Connection returnedConnection = spyManager.getConnection(connectionConfig);
+    Connection secondReturnedConnection = spyManager.getConnection(secondConnectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
+    assertThat(secondReturnedConnection).isNotNull().isSameAs(secondConnection);
+    assertThat(returnedConnection).isNotSameAs(secondReturnedConnection);
   }
 
   @Test
-  public void verifySimpleInvalidateIsUnsupported() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    catchException(this.mgr).write(region, Operation.INVALIDATE, "1", pdx1);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage()).isEqualTo("unsupported operation INVALIDATE");
-  }
+  public void retrivesANewConnectionIfCachedOneIsClosed() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    when(connection.isClosed()).thenReturn(true);
+    Connection secondConnection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    spyManager.getConnection(connectionConfig);
 
-  @Test
-  public void verifyNoTableForRegion() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region("badRegion", cache);
-    catchException(this.mgr).write(region, Operation.DESTROY, "1", null);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("no table was found that matches badRegion");
+    doReturn(secondConnection).when(spyManager).getSQLConnection(connectionConfig);
+    Connection secondReturnedConnection = spyManager.getConnection(connectionConfig);
+    assertThat(secondReturnedConnection).isSameAs(secondConnection);
   }
 
   @Test
-  public void callClose() throws SQLException {
-    createDefaultManager();
-    this.mgr.close();
-  }
-
-  @Test
-  public void callCloseWithConnection() throws SQLException {
-    createDefaultManager();
-    this.mgr.getConnection(null, null);
-    this.mgr.close();
-  }
-
-  @Test
-  public void verifyInsertUpdate() throws SQLException {
-    createUpsertManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    verify(this.preparedStatement2).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture());
-    List<String> allArgs = sqlCaptor.getAllValues();
-    assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME + ", "
-        + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
-    allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
-
-  @Test
-  public void verifyUpdateInsert() throws SQLException {
-    createUpsertManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.UPDATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    verify(this.preparedStatement2).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture());
-    List<String> allArgs = sqlCaptor.getAllValues();
-    assertThat(allArgs.get(0)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    assertThat(allArgs.get(1)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME + ", "
-        + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
-    allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
-
-  @Test
-  public void verifyInsertUpdateThatUpdatesNothing() throws SQLException {
-    createUpsertManager(0);
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    catchException(this.mgr).write(region, Operation.CREATE, "1", pdx1);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage()).isEqualTo("Unexpected updateCount 0");
-    verify(this.preparedStatement).execute();
-    verify(this.preparedStatement2).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture());
-    List<String> allArgs = sqlCaptor.getAllValues();
-    assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME + ", "
-        + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
-    allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
-
-  @Test
-  public void twoTablesOfSameName() throws SQLException {
-    ResultSet primaryKeys = null;
-    ResultSet tables = mock(ResultSet.class);
-    when(tables.next()).thenReturn(true, true, false);
-    when(tables.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
-    createManager(tables, primaryKeys, null);
-    catchException(this.mgr).computeKeyColumnName(regionName);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage()).isEqualTo("Duplicate tables that match region name");
-  }
-
-  @Test
-  public void noPrimaryKeyOnTable() throws SQLException {
-    ResultSet tables = null;
-    ResultSet primaryKeys = mock(ResultSet.class);
-    when(primaryKeys.next()).thenReturn(false);
-    createManager(tables, primaryKeys, null);
-    catchException(this.mgr).computeKeyColumnName(regionName);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("The table " + regionName + " does not have a primary key column.");
-  }
-
-  @Test
-  public void twoPrimaryKeysOnTable() throws SQLException {
-    ResultSet tables = null;
-    ResultSet primaryKeys = mock(ResultSet.class);
-    when(primaryKeys.next()).thenReturn(true, true, false);
-    createManager(tables, primaryKeys, null);
-    catchException(this.mgr).computeKeyColumnName(regionName);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("The table " + regionName + " has more than one primary key column.");
-  }
-
-  @Test
-  public void verifyReadThatMissesReturnsNull() throws SQLException {
-    ResultSet queryResults = mock(ResultSet.class);
-    when(queryResults.next()).thenReturn(false);
-    createManager(null, null, queryResults);
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    assertThat(this.mgr.read(region, "2")).isNull();
-  }
-
-  @Test
-  public void verifyReadWithMultipleResultsFails() throws SQLException {
-    ResultSet queryResults = mock(ResultSet.class);
-    when(queryResults.next()).thenReturn(true, true, false);
-    ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
-    when(rsmd.getColumnCount()).thenReturn(3);
-    when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
-    when(queryResults.getMetaData()).thenReturn(rsmd);
-    createManager(null, null, queryResults);
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-    Region region = Fakes.region(regionName, cache);
-    catchException(this.mgr).read(region, "1");
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("Multiple rows returned for key 1 on table " + regionName);
-  }
-
-  @Test
-  public void verifyReadThatHitsReturnsValue() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21));
-  }
-
-  @Test
-  public void verifyReadWithValueClassName() throws SQLException {
-    createManager("valueClassName-" + regionName, "myValueClass");
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("myValueClass")).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21));
-  }
-
-  @Test
-  public void verifyReadWithKeyPartOfValue() throws SQLException {
-    createManager("isKeyPartOfValue-" + regionName, "true");
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(3)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("id", "name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("1", "Emp1", 21));
-  }
-
-  @Test
-  public void verifyRegionToTable() throws SQLException {
-    String tableName = "mySqlTable";
-    createManagerWithTableName(tableName, "regionToTable-" + regionName, tableName);
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + tableName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21));
-  }
-
-  @Test
-  public void verifyFieldToColumn() throws SQLException {
-    createManager("fieldToColumn", "name:columnName, " + regionName + ":age:columnAge");
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + "columnName"
-        + ", " + "columnAge" + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
+  public void closesAllConnections() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    Connection secondConnection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    JDBCConnectionConfiguration secondConnectionConfig =
+        getTestConnectionConfig("newName", "url", null, null);
+
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    doReturn(secondConnection).when(spyManager).getSQLConnection(secondConnectionConfig);
+    spyManager.getConnection(connectionConfig);
+    spyManager.getConnection(secondConnectionConfig);
+
+    spyManager.close();
+    verify(connection).close();
+    verify(secondConnection).close();
+  }
+
+  private JDBCConnectionConfiguration getTestConnectionConfig(String name, String url, String user,
+      String password) {
+    JDBCConnectionConfiguration config = new JDBCConnectionConfiguration();
+    config.setName(name);
+    config.setUrl(url);
+    config.setUser(user);
+    config.setPassword(password);
+    return config;
+  }
+
+
+  /*
+   * private JDBCManager mgr; private String regionName = "jdbcRegion"; private
+   * JDBCConfigurationService configService; private Connection connection; private
+   * PreparedStatement preparedStatement; private PreparedStatement preparedStatement2;
+   * 
+   * private static final String ID_COLUMN_NAME = "ID"; private static final String NAME_COLUMN_NAME
+   * = "name"; private static final String AGE_COLUMN_NAME = "age";
+   */
+
+  /*
+   * public class TestableUpsertJDBCManager extends JDBCManager {
+   * 
+   * final int upsertReturn;
+   * 
+   * TestableUpsertJDBCManager(JDBCConfigurationService config) { super(config); upsertReturn = 1; }
+   * 
+   * TestableUpsertJDBCManager(JDBCConfigurationService config, int upsertReturn) { super(config);
+   * this.upsertReturn = upsertReturn; }
+   * 
+   * @Override protected Connection createConnection(String url, String user, String password)
+   * throws SQLException { ResultSet rsKeys = mock(ResultSet.class);
+   * when(rsKeys.next()).thenReturn(true, false);
+   * when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
+   * 
+   * ResultSet rs = mock(ResultSet.class); when(rs.next()).thenReturn(true, false);
+   * when(rs.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
+   * 
+   * DatabaseMetaData metaData = mock(DatabaseMetaData.class); when(metaData.getPrimaryKeys(null,
+   * null, regionName.toUpperCase())).thenReturn(rsKeys); when(metaData.getTables(any(), any(),
+   * any(), any())).thenReturn(rs);
+   * 
+   * preparedStatement = mock(PreparedStatement.class); preparedStatement2 =
+   * mock(PreparedStatement.class); when(preparedStatement.getUpdateCount()).thenReturn(0);
+   * when(preparedStatement2.getUpdateCount()).thenReturn(this.upsertReturn);
+   * 
+   * connection = mock(Connection.class); when(connection.getMetaData()).thenReturn(metaData);
+   * when(connection.prepareStatement(any())).thenReturn(preparedStatement, preparedStatement2);
+   * 
+   * return connection; } }
+   */
+
+  /*
+   * public class TestableJDBCManagerWithResultSets extends JDBCManager {
+   * 
+   * private ResultSet tableResults; private ResultSet primaryKeyResults; private ResultSet
+   * queryResultSet; private String tableName;
+   * 
+   * TestableJDBCManagerWithResultSets(String tableName, JDBCConfigurationService config, ResultSet
+   * tableResults, ResultSet primaryKeyResults, ResultSet queryResultSet) { super(config);
+   * this.tableResults = tableResults; this.primaryKeyResults = primaryKeyResults;
+   * this.queryResultSet = queryResultSet; this.tableName = tableName; }
+   * 
+   * @Override protected Connection createConnection(String url, String user, String password)
+   * throws SQLException { if (primaryKeyResults == null) { primaryKeyResults =
+   * mock(ResultSet.class); when(primaryKeyResults.next()).thenReturn(true, false);
+   * when(primaryKeyResults.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME); }
+   * 
+   * if (tableResults == null) { tableResults = mock(ResultSet.class);
+   * when(tableResults.next()).thenReturn(true, false);
+   * when(tableResults.getString("TABLE_NAME")).thenReturn(this.tableName.toUpperCase()); }
+   * 
+   * DatabaseMetaData metaData = mock(DatabaseMetaData.class); when(metaData.getPrimaryKeys(null,
+   * null, this.tableName.toUpperCase())) .thenReturn(primaryKeyResults);
+   * when(metaData.getTables(any(), any(), any(), any())).thenReturn(tableResults);
+   * 
+   * preparedStatement = mock(PreparedStatement.class);
+   * when(preparedStatement.getUpdateCount()).thenReturn(1);
+   * 
+   * 
+   * if (this.queryResultSet == null) { queryResultSet = mock(ResultSet.class); ResultSetMetaData
+   * rsmd = mock(ResultSetMetaData.class); when(rsmd.getColumnCount()).thenReturn(3);
+   * when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
+   * when(queryResultSet.getMetaData()).thenReturn(rsmd);
+   * when(queryResultSet.next()).thenReturn(true, false);
+   * when(queryResultSet.getObject(anyInt())).thenReturn("1", "Emp1", 21); }
+   * 
+   * when(preparedStatement.executeQuery()).thenReturn(queryResultSet);
+   * 
+   * connection = mock(Connection.class); when(connection.getMetaData()).thenReturn(metaData);
+   * when(connection.prepareStatement(any())).thenReturn(preparedStatement);
+   * 
+   * return connection; } }
+   */
+
+  /*
+   * @Before public void setUp() throws Exception { configService =
+   * mock(JDBCConfigurationService.class); }
+   * 
+   * @After public void tearDown() throws Exception {}
+   * 
+   * private void createManager(String... args) throws SQLException {
+   * createManagerWithTableName(regionName, args); }
+   * 
+   * private void createManagerWithTableName(String tableName, String... args) throws SQLException {
+   * ResultSet rsKeys = mock(ResultSet.class); when(rsKeys.next()).thenReturn(true, false);
+   * when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
+   * 
+   * ResultSet rs = mock(ResultSet.class); when(rs.next()).thenReturn(true, false);
+   * when(rs.getString("TABLE_NAME")).thenReturn(tableName.toUpperCase());
+   * 
+   * this.mgr = new JDBCManager(tableName, createConfiguration(args), rs, rsKeys, null); }
+   * 
+   * private void createDefaultManager() throws SQLException { createManager(); }
+   * 
+   * private void createUpsertManager() { this.mgr = new
+   * TestableUpsertJDBCManager(createConfiguration()); }
+   * 
+   * private void createUpsertManager(int upsertReturn) { this.mgr = new
+   * TestableUpsertJDBCManager(createConfiguration(), upsertReturn); }
+   * 
+   * private void createManager(ResultSet tableNames, ResultSet primaryKeys, ResultSet
+   * queryResultSet, String... args) { this.mgr = new TestableJDBCManagerWithResultSets(regionName,
+   * createConfiguration(args), tableNames, primaryKeys, queryResultSet); }
+   * 
+   * private JDBCConfigurationService createConfiguration(String... args) { Properties props = new
+   * Properties(); props.setProperty("url", "fakeURL"); String[] argsArray = args; if (argsArray !=
+   * null) { assert argsArray.length % 2 == 0; for (int i = 0; i < argsArray.length; i += 2) {
+   * props.setProperty(argsArray[i], argsArray[i + 1]); } } JDBCConfigurationService service = new
+   * JDBCConfigurationService(); JDBCConnectionConfiguration connectionConfig =
+   * service.addOrUpdateConnectionConfig(conne); return service; }
+   * 
+   * @Test public void verifySimpleCreateCallsExecute() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * this.mgr.write(region, Operation.CREATE, "1", pdx1); verify(this.preparedStatement).execute();
+   * ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
+   * + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifySimpleCreateWithIdField() throws SQLException { createDefaultManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region(regionName, cache);
+   * PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21, true); this.mgr.write(region,
+   * Operation.CREATE, "1", pdx1); verify(this.preparedStatement).execute(); ArgumentCaptor<String>
+   * sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
+   * + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifySimpleUpdateCallsExecute() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * this.mgr.write(region, Operation.UPDATE, "1", pdx1); verify(this.preparedStatement).execute();
+   * ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
+   * + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifySimpleDestroyCallsExecute() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); this.mgr.write(region, Operation.DESTROY, "1", null);
+   * verify(this.preparedStatement).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("DELETE FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); }
+   * 
+   * @Test public void verifyTwoCreatesReuseSameStatement() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * PdxInstanceImpl pdx2 = mockPdxInstance("Emp2", 55); this.mgr.write(region, Operation.CREATE,
+   * "1", pdx1); this.mgr.write(region, Operation.CREATE, "2", pdx2); verify(this.preparedStatement,
+   * times(2)).execute(); verify(this.connection).prepareStatement(any()); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(6)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * assertThat(allObjects.get(3)).isEqualTo("Emp2"); assertThat(allObjects.get(4)).isEqualTo(55);
+   * assertThat(allObjects.get(5)).isEqualTo("2"); }
+   * 
+   * private PdxInstanceImpl mockPdxInstance(String name, int age) { return mockPdxInstance(name,
+   * age, false); }
+   * 
+   * private PdxInstanceImpl mockPdxInstance(String name, int age, boolean addId) { PdxInstanceImpl
+   * pdxInstance = mock(PdxInstanceImpl.class); if (addId) { when(pdxInstance.getFieldNames())
+   * .thenReturn(Arrays.asList(NAME_COLUMN_NAME, AGE_COLUMN_NAME, ID_COLUMN_NAME));
+   * when(pdxInstance.getField(ID_COLUMN_NAME)).thenReturn("bogusId"); } else {
+   * when(pdxInstance.getFieldNames()) .thenReturn(Arrays.asList(NAME_COLUMN_NAME,
+   * AGE_COLUMN_NAME)); } when(pdxInstance.getField(NAME_COLUMN_NAME)).thenReturn(name);
+   * when(pdxInstance.getField(AGE_COLUMN_NAME)).thenReturn(age); PdxType pdxType =
+   * mock(PdxType.class); when(pdxType.getTypeId()).thenReturn(1);
+   * when(pdxInstance.getPdxType()).thenReturn(pdxType); return pdxInstance; }
+   * 
+   * @Test public void verifySimpleInvalidateIsUnsupported() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * catchException(this.mgr).write(region, Operation.INVALIDATE, "1", pdx1); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()).isEqualTo("unsupported operation INVALIDATE"); }
+   * 
+   * @Test public void verifyNoTableForRegion() throws SQLException { createDefaultManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region("badRegion", cache);
+   * catchException(this.mgr).write(region, Operation.DESTROY, "1", null); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage())
+   * .isEqualTo("no table was found that matches badRegion"); }
+   * 
+   * @Test public void callClose() throws SQLException { createDefaultManager(); this.mgr.close(); }
+   * 
+   * @Test public void callCloseWithConnection() throws SQLException { createDefaultManager();
+   * this.mgr.getConnection(null, null); this.mgr.close(); }
+   * 
+   * @Test public void verifyInsertUpdate() throws SQLException { createUpsertManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region(regionName, cache);
+   * PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21); this.mgr.write(region, Operation.CREATE,
+   * "1", pdx1); verify(this.preparedStatement).execute();
+   * verify(this.preparedStatement2).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class); verify(this.connection,
+   * times(2)).prepareStatement(sqlCaptor.capture()); List<String> allArgs =
+   * sqlCaptor.getAllValues(); assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName +
+   * "(" + NAME_COLUMN_NAME + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
+   * assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME +
+   * " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
+   * allObjects = objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifyUpdateInsert() throws SQLException { createUpsertManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region(regionName, cache);
+   * PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21); this.mgr.write(region, Operation.UPDATE,
+   * "1", pdx1); verify(this.preparedStatement).execute();
+   * verify(this.preparedStatement2).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class); verify(this.connection,
+   * times(2)).prepareStatement(sqlCaptor.capture()); List<String> allArgs =
+   * sqlCaptor.getAllValues(); assertThat(allArgs.get(0)).isEqualTo("UPDATE " + regionName + " SET "
+   * + NAME_COLUMN_NAME + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
+   * assertThat(allArgs.get(1)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME +
+   * ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
+   * allObjects = objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifyInsertUpdateThatUpdatesNothing() throws SQLException {
+   * createUpsertManager(0); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * catchException(this.mgr).write(region, Operation.CREATE, "1", pdx1); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()).isEqualTo("Unexpected updateCount 0");
+   * verify(this.preparedStatement).execute(); verify(this.preparedStatement2).execute();
+   * ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture()); List<String> allArgs =
+   * sqlCaptor.getAllValues(); assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName +
+   * "(" + NAME_COLUMN_NAME + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
+   * assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME +
+   * " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
+   * allObjects = objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void twoTablesOfSameName() throws SQLException { ResultSet primaryKeys = null;
+   * ResultSet tables = mock(ResultSet.class); when(tables.next()).thenReturn(true, true, false);
+   * when(tables.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
+   * createManager(tables, primaryKeys, null);
+   * catchException(this.mgr).computeKeyColumnName(regionName); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()).isEqualTo("Duplicate tables that match region name"
+   * ); }
+   * 
+   * @Test public void noPrimaryKeyOnTable() throws SQLException { ResultSet tables = null;
+   * ResultSet primaryKeys = mock(ResultSet.class); when(primaryKeys.next()).thenReturn(false);
+   * createManager(tables, primaryKeys, null);
+   * catchException(this.mgr).computeKeyColumnName(regionName); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()) .isEqualTo("The table " + regionName +
+   * " does not have a primary key column."); }
+   * 
+   * @Test public void twoPrimaryKeysOnTable() throws SQLException { ResultSet tables = null;
+   * ResultSet primaryKeys = mock(ResultSet.class); when(primaryKeys.next()).thenReturn(true, true,
+   * false); createManager(tables, primaryKeys, null);
+   * catchException(this.mgr).computeKeyColumnName(regionName); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()) .isEqualTo("The table " + regionName +
+   * " has more than one primary key column."); }
+   * 
+   * @Test public void verifyReadThatMissesReturnsNull() throws SQLException { ResultSet
+   * queryResults = mock(ResultSet.class); when(queryResults.next()).thenReturn(false);
+   * createManager(null, null, queryResults); GemFireCacheImpl cache = Fakes.cache(); Region region
+   * = Fakes.region(regionName, cache); assertThat(this.mgr.read(region, "2")).isNull(); }
+   * 
+   * @Test public void verifyReadWithMultipleResultsFails() throws SQLException { ResultSet
+   * queryResults = mock(ResultSet.class); when(queryResults.next()).thenReturn(true, true, false);
+   * ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
+   * when(rsmd.getColumnCount()).thenReturn(3); when(rsmd.getColumnName(anyInt())).thenReturn("ID",
+   * "NAME", "AGE"); when(queryResults.getMetaData()).thenReturn(rsmd); createManager(null, null,
+   * queryResults); GemFireCacheImpl cache = Fakes.cache(); PdxInstanceFactory factory =
+   * mock(PdxInstanceFactory.class); PdxInstance pi = mock(PdxInstance.class);
+   * when(factory.create()).thenReturn(pi); when(cache.createPdxInstanceFactory("no class",
+   * false)).thenReturn(factory); Region region = Fakes.region(regionName, cache);
+   * catchException(this.mgr).read(region, "1"); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage())
+   * .isEqualTo("Multiple rows returned for key 1 on table " + regionName); }
+   * 
+   * @Test public void verifyReadThatHitsReturnsValue() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); PdxInstanceFactory factory =
+   * mock(PdxInstanceFactory.class); PdxInstance pi = mock(PdxInstance.class);
+   * when(factory.create()).thenReturn(pi); when(cache.createPdxInstanceFactory("no class",
+   * false)).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
+   * assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21)); }
+   * 
+   * @Test public void verifyReadWithValueClassName() throws SQLException {
+   * createManager("valueClassName-" + regionName, "myValueClass"); GemFireCacheImpl cache =
+   * Fakes.cache(); PdxInstanceFactory factory = mock(PdxInstanceFactory.class); PdxInstance pi =
+   * mock(PdxInstance.class); when(factory.create()).thenReturn(pi);
+   * when(cache.createPdxInstanceFactory("myValueClass")).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
+   * assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21)); }
+   * 
+   * @Test public void verifyReadWithKeyPartOfValue() throws SQLException {
+   * createManager("isKeyPartOfValue-" + regionName, "true"); GemFireCacheImpl cache =
+   * Fakes.cache(); PdxInstanceFactory factory = mock(PdxInstanceFactory.class); PdxInstance pi =
+   * mock(PdxInstance.class); when(factory.create()).thenReturn(pi);
+   * when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(3)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("id", "name",
+   * "age")); assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("1", "Emp1", 21));
+   * }
+   * 
+   * @Test public void verifyRegionToTable() throws SQLException { String tableName = "mySqlTable";
+   * createManagerWithTableName(tableName, "regionToTable-" + regionName, tableName);
+   * GemFireCacheImpl cache = Fakes.cache(); PdxInstanceFactory factory =
+   * mock(PdxInstanceFactory.class); PdxInstance pi = mock(PdxInstance.class);
+   * when(factory.create()).thenReturn(pi); when(cache.createPdxInstanceFactory("no class",
+   * false)).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + tableName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
+   * assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21)); }
+   * 
+   * @Test public void verifyFieldToColumn() throws SQLException { createManager("fieldToColumn",
+   * "name:columnName, " + regionName + ":age:columnAge"); GemFireCacheImpl cache = Fakes.cache();
+   * Region region = Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1",
+   * 21); this.mgr.write(region, Operation.CREATE, "1", pdx1);
+   * verify(this.preparedStatement).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + "columnName" +
+   * ", " + "columnAge" + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   */
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java
deleted file mode 100644
index ccdb2e0..0000000
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- *  * agreements. See the NOTICE file distributed with this work for additional information regarding
- *  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance with the License. You may obtain a
- *  * copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software distributed under the License
- *  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- *  * or implied. See the License for the specific language governing permissions and limitations under
- *  * the License.
- *
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.junit.Test;
-
-public class JDBCPropertyParserTest {
-  @Test
-  public void returnsEmptyMapIfNoPropertiesPresent() {
-    Properties props = new Properties();
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, String> map = parser.getPropertiesMap("test", v -> v);
-    assertThat(map).isEmpty();
-  }
-
-  @Test
-  public void returnsMapWithKeyValuePair() {
-    Properties props = new Properties();
-    props.setProperty("name-key", "value");
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, String> map = parser.getPropertiesMap("name", v -> v);
-    assertThat(map).hasSize(1).containsKey("key").containsValue("value");
-  }
-
-  @Test
-  public void returnsMapWithMultipleKeyValuePairs() {
-    Properties props = new Properties();
-    props.setProperty("name-key1", "value1");
-    props.setProperty("name-key2", "value2");
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, String> map = parser.getPropertiesMap("name", v -> v);
-    assertThat(map).hasSize(2).containsExactly(entry("key1", "value1"), entry("key2", "value2"));
-  }
-
-  @Test
-  public void returnsMapWithBooleanValues() {
-    Properties props = new Properties();
-    props.setProperty("name-key1", "true");
-    props.setProperty("name-key2", "false");
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, Boolean> map = parser.getPropertiesMap("name", Boolean::parseBoolean);
-    assertThat(map).hasSize(2).containsExactly(entry("key1", true), entry("key2", false));
-  }
-}
\ No newline at end of file
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMappingTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMappingTest.java
new file mode 100644
index 0000000..e9a3ca7
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMappingTest.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class JDBCRegionMappingTest {
+  private JDBCRegionMapping mapping = new JDBCRegionMapping();
+
+  @Test
+  public void initiatedWithNullValues() {
+    assertThat(mapping.getTableName()).isNull();
+    assertThat(mapping.getRegionName()).isNull();
+    assertThat(mapping.getConnectionConfigName()).isNull();
+    assertThat(mapping.getPdxClassName()).isNull();
+  }
+
+  @Test
+  public void hasCorrectTableName() {
+    String name = "name";
+    mapping.setTableName(name);
+    assertThat(mapping.getTableName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectRegionName() {
+    String name = "name";
+    mapping.setRegionName(name);
+    assertThat(mapping.getRegionName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectConfigName() {
+    String name = "name";
+    mapping.setConnectionConfigName(name);
+    assertThat(mapping.getConnectionConfigName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectPdxClassName() {
+    String name = "name";
+    mapping.setPdxClassName(name);
+    assertThat(mapping.getPdxClassName()).isEqualTo(name);
+  }
+
+  @Test
+  public void primaryKeyInValueSetCorrectly() {
+    mapping.setPrimaryKeyInValue("true");
+    assertThat(mapping.isPrimaryKeyInValue()).isTrue();
+  }
+
+  @Test
+  public void returnsFieldNameIfColumnNotMapped() {
+    String fieldName = "myField";
+    mapping.addFieldToColumnMapping("otherField", "column");
+    assertThat(mapping.getColumnNameForField(fieldName)).isEqualTo(fieldName);
+  }
+
+  @Test
+  public void returnsMappedColumnNameForField() {
+    String fieldName = "myField";
+    String columnName = "myColumn";
+    mapping.addFieldToColumnMapping(fieldName, columnName);
+    assertThat(mapping.getColumnNameForField(fieldName)).isEqualTo(columnName);
+
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java
new file mode 100644
index 0000000..55d4b0d
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.Operation;
+
+public class PreparedStatementCacheTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private PreparedStatementCache cache;
+  private Connection connection;
+  private List<ColumnValue> values = new ArrayList<>();
+
+  @Before
+  public void setup() throws SQLException {
+    cache = new PreparedStatementCache();
+    connection = mock(Connection.class);
+    when(connection.prepareStatement(any())).thenReturn(mock(PreparedStatement.class));
+    values.add(mock(ColumnValue.class));
+  }
+
+  @Test
+  public void returnsSameStatementForIdenticalInputs() throws Exception {
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    verify(connection, times(1)).prepareStatement(any());
+  }
+
+  @Test
+  public void returnsDifferentStatementForNonIdenticalInputs() throws Exception {
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    cache.getPreparedStatement(connection, values, "table2", Operation.UPDATE, 1);
+    verify(connection, times(2)).prepareStatement(any());
+  }
+
+  @Test()
+  public void throwsExceptionIfPreparingStatementFails() throws Exception {
+    when(connection.prepareStatement(any())).thenThrow(SQLException.class);
+    thrown.expect(IllegalStateException.class);
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+  }
+
+  @Test
+  public void throwsExceptionIfInvalidOperationGiven() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    cache.getPreparedStatement(connection, values, "table1", Operation.REGION_CLOSE, 1);
+
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SQLHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SQLHandlerTest.java
new file mode 100644
index 0000000..a31c880
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SQLHandlerTest.java
@@ -0,0 +1,361 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
+import org.apache.geode.pdx.internal.PdxType;
+
+public class SQLHandlerTest {
+  private static final String REGION_NAME = "testRegion";
+  private static final String TABLE_NAME = "testTable";
+  private static final Object COLUMN_VALUE_1 = new Object();
+  private static final String COLUMN_NAME_1 = "columnName1";
+  private static final Object COLUMN_VALUE_2 = new Object();
+  private static final String COLUMN_NAME_2 = "columnName2";
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private JDBCManager manager;
+  private Region region;
+  private InternalCache cache;
+  private SQLHandler handler;
+  private PreparedStatement statement;
+  private JDBCRegionMapping regionMapping;
+  private PdxInstanceImpl value;
+
+  @Before
+  public void setup() throws Exception {
+    manager = mock(JDBCManager.class);
+    region = mock(Region.class);
+    cache = mock(InternalCache.class);
+    when(region.getRegionService()).thenReturn(cache);
+    handler = new SQLHandler(manager);
+    value = mock(PdxInstanceImpl.class);
+    when(value.getPdxType()).thenReturn(mock(PdxType.class));
+    setupManagerMock();
+  }
+
+  @Test
+  public void readReturnsNullIfNoKeyProvided() {
+    thrown.expect(IllegalArgumentException.class);
+    handler.read(region, null);
+  }
+
+  @Test
+  public void usesPdxFactoryForClassWhenExists() throws Exception {
+    setupEmptyResultSet();
+    String pdxClassName = "classname";
+    when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
+    handler.read(region, new Object());
+
+    verify(cache).createPdxInstanceFactory(pdxClassName);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void readClearsPreparedStatementWhenFinished() throws Exception {
+    setupEmptyResultSet();
+    handler.read(region, new Object());
+    verify(statement).clearParameters();
+  }
+
+  @Test
+  public void usesPbxFactoryForNoPbxClassWhenClassNonExistent() throws Exception {
+    setupEmptyResultSet();
+    handler.read(region, new Object());
+
+    verify(cache).createPdxInstanceFactory("no class", false);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void readReturnsNullIfNoResultsReturned() throws Exception {
+    setupEmptyResultSet();
+    assertThat(handler.read(region, new Object())).isNull();
+  }
+
+  @Test
+  public void throwsExceptionIfQueryFails() throws Exception {
+    when(statement.executeQuery()).thenThrow(SQLException.class);
+
+    thrown.expect(IllegalStateException.class);
+    handler.read(region, new Object());
+  }
+
+  @Test
+  public void readReturnsDataFromAllResultColumns() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn("key");
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+
+    String filedName1 = COLUMN_NAME_1.toLowerCase();
+    String filedName2 = COLUMN_NAME_2.toLowerCase();
+    handler.read(region, new Object());
+    verify(factory).writeField(filedName1, COLUMN_VALUE_1, Object.class);
+    verify(factory).writeField(filedName2, COLUMN_VALUE_2, Object.class);
+    verify(factory).create();
+  }
+
+  @Test
+  public void readResultOmitsKeyColumnIfNotInValue() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn(COLUMN_NAME_1);
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+
+    String filedName2 = COLUMN_NAME_2.toLowerCase();
+    handler.read(region, new Object());
+    verify(factory).writeField(filedName2, COLUMN_VALUE_2, Object.class);
+    verify(factory, times(1)).writeField(any(), any(), any());
+    verify(factory).create();
+  }
+
+  @Test
+  public void throwsExceptionIfMoreThatOneResultReturned() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true);
+    when(result.getStatement()).thenReturn(mock(PreparedStatement.class));
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn("key");
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean()))
+        .thenReturn(mock(PdxInstanceFactory.class));
+
+    thrown.expect(IllegalStateException.class);
+    handler.read(region, new Object());
+  }
+
+  @Test
+  public void writeThrowsExceptionIfValueIsNullAndNotDoingDestroy() {
+    thrown.expect(IllegalArgumentException.class);
+    handler.write(region, Operation.UPDATE, new Object(), null);
+  }
+
+  @Test
+  public void insertActionSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement).setObject(2, COLUMN_VALUE_2);
+  }
+
+  @Test
+  public void updateActionSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement).setObject(2, COLUMN_VALUE_2);
+  }
+
+  @Test
+  public void destroyActionSucceeds() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement, times(1)).setObject(anyInt(), any());
+  }
+
+  @Test
+  public void destroyActionThatRemovesNoRowCompletesUnexceptionally() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenReturn(0);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement, times(1)).setObject(anyInt(), any());
+  }
+
+  @Test
+  public void destroyThrowExceptionWhenFail() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+  }
+
+  @Test
+  public void preparedStatementClearedAfterExecution() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).clearParameters();
+  }
+
+  @Test
+  public void whenInsertFailsUpdateSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(0);
+
+    PreparedStatement updateStatement = mock(PreparedStatement.class);
+    when(updateStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(updateStatement);
+
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(updateStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(updateStatement).clearParameters();
+  }
+
+  @Test
+  public void whenUpdateFailsInsertSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(0);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(insertStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenInsertFailsWithExceptionUpdateSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement updateStatement = mock(PreparedStatement.class);
+    when(updateStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(updateStatement);
+
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(updateStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(updateStatement).clearParameters();
+  }
+
+  @Test
+  public void whenUpdateFailsWithExceptionInsertSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(insertStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenBothInsertAndUpdateFailExceptionIsThrown() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenThrow(SQLException.class);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenStatementUpdatesMultipleRowsExceptionThrown() throws Exception {
+    when(statement.executeUpdate()).thenReturn(2);
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).clearParameters();
+  }
+
+  private void setupManagerMock() throws SQLException {
+    JDBCConnectionConfiguration connectionConfig = mock(JDBCConnectionConfiguration.class);
+    when(manager.getConnectionConfig(any())).thenReturn(connectionConfig);
+
+    regionMapping = mock(JDBCRegionMapping.class);
+    when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
+    when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
+    when(manager.getMappingForRegion(any())).thenReturn(regionMapping);
+
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    columnList.add(new ColumnValue(true, COLUMN_NAME_2, COLUMN_VALUE_2));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+
+    statement = mock(PreparedStatement.class);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement);
+  }
+
+  private void setupResultSet(ResultSet result) throws SQLException {
+    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+    when(result.getMetaData()).thenReturn(metaData);
+    when(metaData.getColumnCount()).thenReturn(2);
+
+    when(result.getObject(1)).thenReturn(COLUMN_VALUE_1);
+    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
+
+    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
+    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
+  }
+
+  private void setupEmptyResultSet() throws SQLException {
+    ResultSet result = mock(ResultSet.class);
+    when(result.next()).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
new file mode 100644
index 0000000..0f277c0
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class SqlStatementFactoryTest {
+  private static final String TABLE_NAME = "testTable";
+  private static final String KEY_COLUMN_NAME = "column1";
+
+  private List<ColumnValue> columnValues = new ArrayList<>();
+  private SqlStatementFactory factory = new SqlStatementFactory();
+
+  @Before
+  public void setup() {
+    columnValues.add(new ColumnValue(false, "column0", null));
+    columnValues.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    columnValues.add(new ColumnValue(false, "column2", null));
+  }
+
+  @Test
+  public void getSelectQueryString() throws Exception {
+    List<ColumnValue> keyColumn = new ArrayList<>();
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    String statement = factory.createSelectQueryString(TABLE_NAME, keyColumn);
+    String expectedStatement =
+        String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getDestroySqlString() throws Exception {
+    List<ColumnValue> keyColumn = new ArrayList<>();
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    String statement = factory.createDestroySqlString(TABLE_NAME, keyColumn);
+    String expectedStatement =
+        String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getUpdateSqlString() throws Exception {
+    String statement = factory.createUpdateSqlString(TABLE_NAME, columnValues);
+    String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
+        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(2).getColumnName(),
+        KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getInsertSqlString() throws Exception {
+    String statement = factory.createInsertSqlString(TABLE_NAME, columnValues);
+    String expectedStatement = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?,?,?)",
+        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(1).getColumnName(),
+        columnValues.get(2).getColumnName());
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
new file mode 100644
index 0000000..34efb6a
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+public class TestConfigService {
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String REGION_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+  private static final String CONNECTION_CONFIG_NAME = "testConnectionConfig";
+
+  public static JDBCConfigurationService getTestConfigService() {
+    JDBCConfigurationService service = new JDBCConfigurationService();
+    service.addOrUpdateConnectionConfig(createConnectionConfig());
+    service.addOrUpdateRegionMapping(createRegionMapping());
+    return service;
+  }
+
+  private static JDBCRegionMapping createRegionMapping() {
+    JDBCRegionMapping mapping = new JDBCRegionMapping();
+    mapping.setConnectionConfigName(CONNECTION_CONFIG_NAME);
+    mapping.setTableName(REGION_TABLE_NAME);
+    mapping.setRegionName(REGION_NAME);
+    mapping.setPrimaryKeyInValue("false");
+    return mapping;
+  }
+
+  private static JDBCConnectionConfiguration createConnectionConfig() {
+    JDBCConnectionConfiguration config = new JDBCConnectionConfiguration();
+    config.setUrl(CONNECTION_URL);
+    config.setName(CONNECTION_CONFIG_NAME);
+    return config;
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.