You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nr...@apache.org on 2017/11/10 23:04:39 UTC

[geode] branch feature/GEODE-3781 updated (5b9698d -> c5325c1)

This is an automated email from the ASF dual-hosted git repository.

nreich pushed a change to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git.


 discard 5b9698d  parser now detects empty fields around the jdbc separator
 discard 2afcd77  config can now make a column name to pdx field name. Configuration map keys are now case insensitive.
     new 002f656  simplify jdbc configuration
     new c5325c1  Rearchitect JDBC connector for easier testing Add new system for handling configuration Expand test coverage for connector

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5b9698d)
            \
             N -- N -- N   refs/heads/feature/GEODE-3781 (c5325c1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/connectors/jdbc/JDBCAsyncWriter.java     |   19 +-
 .../apache/geode/connectors/jdbc/JDBCLoader.java   |   18 +-
 .../connectors/jdbc/JDBCSynchronousWriter.java     |   21 +-
 .../connectors/jdbc/internal/ColumnValue.java      |   14 +-
 .../jdbc/internal/JDBCConfiguration.java           |  386 -------
 .../jdbc/internal/JDBCConfigurationService.java    |   40 +
 .../jdbc/internal/JDBCConnectionConfiguration.java |   54 +
 .../connectors/jdbc/internal/JDBCManager.java      |  384 ++-----
 .../jdbc/internal/JDBCRegionMapping.java           |   75 ++
 .../jdbc/internal/PreparedStatementCache.java      |  103 ++
 .../geode/connectors/jdbc/internal/SQLHandler.java |  182 +++
 .../jdbc/internal/SqlStatementFactory.java         |   77 ++
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       |   11 +-
 .../connectors/jdbc/JDBCLoaderIntegrationTest.java |    7 +-
 .../jdbc/JDBCSynchronousWriterIntegrationTest.java |    8 +-
 .../connectors/jdbc/internal/ColumnValueTest.java  |   52 +
 .../internal/JDBCConfigurationServiceTest.java     |   73 ++
 .../jdbc/internal/JDBCConfigurationUnitTest.java   |  435 --------
 .../internal/JDBCConnectionConfigurationTest.java  |   58 +
 .../jdbc/internal/JDBCManagerUnitTest.java         | 1157 +++++++++-----------
 .../jdbc/internal/JDBCRegionMappingTest.java       |   80 ++
 .../jdbc/internal/PreparedStatementCacheTest.java  |   79 ++
 .../connectors/jdbc/internal/SQLHandlerTest.java   |  361 ++++++
 .../jdbc/internal/SqlStatementFactoryTest.java     |   76 ++
 .../jdbc/internal/TestConfigService.java           |   45 +
 25 files changed, 2007 insertions(+), 1808 deletions(-)
 delete mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationService.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfiguration.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMapping.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SQLHandler.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationServiceTest.java
 delete mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfigurationTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMappingTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SQLHandlerTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].

[geode] 01/02: simplify jdbc configuration

Posted by nr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nreich pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 002f656c709cbe2ebd21e3dcf4d3a892e30b5637
Author: Nick Reich <nr...@pivotal.io>
AuthorDate: Mon Nov 6 17:46:31 2017 -0800

    simplify jdbc configuration
---
 .../jdbc/internal/JDBCConfiguration.java           | 190 ++++++---------------
 .../connectors/jdbc/internal/JDBCManager.java      |  32 ++--
 .../jdbc/internal/JDBCPropertyParser.java          |  47 +++++
 .../jdbc/internal/JDBCConfigurationUnitTest.java   | 112 +-----------
 .../jdbc/internal/JDBCManagerUnitTest.java         |   6 +-
 .../jdbc/internal/JDBCPropertyParserTest.java      |  64 +++++++
 6 files changed, 190 insertions(+), 261 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
index d051b01..9e14112 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
@@ -18,15 +18,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.function.Function;
-import java.util.function.IntPredicate;
 
 public class JDBCConfiguration {
+  private static final boolean DEFAULT_KEY_IN_VALUE = false; //TODO: determine what default is
   private static final String URL = "url";
   private static final String USER = "user";
   private static final String PASSWORD = "password";
@@ -61,83 +59,29 @@ public class JDBCConfiguration {
    */
   private static final String FIELD_TO_COLUMN = "fieldToColumn";
 
-  private static final List<String> knownProperties =
-      Collections.unmodifiableList(Arrays.asList(URL, USER, PASSWORD, VALUE_CLASS_NAME,
-          IS_KEY_PART_OF_VALUE, REGION_TO_TABLE, FIELD_TO_COLUMN));
-
   private static final List<String> requiredProperties =
       Collections.unmodifiableList(Arrays.asList(URL));
 
   private final String url;
   private final String user;
   private final String password;
-  private final String valueClassNameDefault;
   private final Map<String, String> regionToClassMap;
-  private final boolean keyPartOfValueDefault;
   private final Map<String, Boolean> keyPartOfValueMap;
   private final Map<String, String> regionToTableMap;
   private final Map<RegionField, String> fieldToColumnMap;
 
   public JDBCConfiguration(Properties configProps) {
-    validateKnownProperties(configProps);
     validateRequiredProperties(configProps);
     this.url = configProps.getProperty(URL);
     this.user = configProps.getProperty(USER);
     this.password = configProps.getProperty(PASSWORD);
-    String valueClassNameProp = configProps.getProperty(VALUE_CLASS_NAME);
-    this.valueClassNameDefault = computeDefaultValueClassName(valueClassNameProp);
-    this.regionToClassMap = computeRegionToClassMap(valueClassNameProp);
-    String keyPartOfValueProp = configProps.getProperty(IS_KEY_PART_OF_VALUE);
-    this.keyPartOfValueDefault = computeDefaultKeyPartOfValue(keyPartOfValueProp);
-    this.keyPartOfValueMap = computeKeyPartOfValueMap(keyPartOfValueProp);
-    this.regionToTableMap = computeRegionToTableMap(configProps.getProperty(REGION_TO_TABLE));
+    JDBCPropertyParser parser = new JDBCPropertyParser(configProps);
+    this.regionToClassMap = parser.getPropertiesMap(VALUE_CLASS_NAME, v -> v);
+    this.keyPartOfValueMap = parser.getPropertiesMap(IS_KEY_PART_OF_VALUE, Boolean::parseBoolean);
+    this.regionToTableMap = parser.getPropertiesMap(REGION_TO_TABLE, v -> v);
     this.fieldToColumnMap = computeFieldToColumnMap(configProps.getProperty(FIELD_TO_COLUMN));
   }
 
-  public static class RegionField {
-    private final String regionName; // may be null
-    private final String fieldName;
-
-    public RegionField(String regionName, String fieldName) {
-      this.regionName = regionName;
-      this.fieldName = fieldName;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + fieldName.hashCode();
-      result = prime * result + ((regionName == null) ? 0 : regionName.hashCode());
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      RegionField other = (RegionField) obj;
-      if (!fieldName.equals(other.fieldName)) {
-        return false;
-      }
-      if (regionName == null) {
-        if (other.regionName != null) {
-          return false;
-        }
-      } else if (!regionName.equals(other.regionName)) {
-        return false;
-      }
-      return true;
-    }
-  }
-
   private Map<RegionField, String> computeFieldToColumnMap(String prop) {
     Function<String, RegionField> regionFieldParser = new Function<String, RegionField>() {
       @Override
@@ -157,26 +101,6 @@ public class JDBCConfiguration {
     return parseMap(prop, regionFieldParser, v -> v, true);
   }
 
-  private Map<String, String> computeRegionToTableMap(String prop) {
-    return parseMap(prop, k -> k, v -> v, true);
-  }
-
-  private String computeDefaultValueClassName(String valueClassNameProp) {
-    return parseDefault(VALUE_CLASS_NAME, valueClassNameProp, v -> v, null);
-  }
-
-  private Map<String, String> computeRegionToClassMap(String valueClassNameProp) {
-    return parseMap(valueClassNameProp, k -> k, v -> v, false);
-  }
-
-  private boolean computeDefaultKeyPartOfValue(String keyPartOfValueProp) {
-    return parseDefault(IS_KEY_PART_OF_VALUE, keyPartOfValueProp, Boolean::parseBoolean, false);
-  }
-
-  private Map<String, Boolean> computeKeyPartOfValueMap(String keyPartOfValueProp) {
-    return parseMap(keyPartOfValueProp, k -> k, Boolean::parseBoolean, false);
-  }
-
   private <K, V> Map<K, V> parseMap(String propertyValue, Function<String, K> keyParser,
       Function<String, V> valueParser, boolean failOnNoSeparator) {
     if (propertyValue == null) {
@@ -203,87 +127,41 @@ public class JDBCConfiguration {
     return result;
   }
 
-  private <V> V parseDefault(String propertyName, String propertyValue, Function<String, V> parser,
-      V defaultValue) {
-    if (propertyValue == null) {
-      return defaultValue;
-    }
-    V result = null;
-    List<String> items = Arrays.asList(propertyValue.split("\\s*,\\s*"));
-    for (String item : items) {
-      int idx = item.indexOf(getjdbcSeparator());
-      if (idx != -1) {
-        continue;
-      }
-      if (result != null) {
-        throw new IllegalArgumentException(propertyName
-            + " can have at most one item that does not have a " + getjdbcSeparator() + " in it.");
-      }
-      result = parser.apply(item);
-    }
-    if (result == null) {
-      result = defaultValue;
-    }
-    return result;
-  }
-
-
-  private void validateKnownProperties(Properties configProps) {
-    Set<Object> keys = new HashSet<>(configProps.keySet());
-    keys.removeAll(knownProperties);
-    if (!keys.isEmpty()) {
-      throw new IllegalArgumentException("unknown properties: " + keys);
-    }
-  }
-
   private void validateRequiredProperties(Properties configProps) {
     List<String> reqKeys = new ArrayList<>(requiredProperties);
-    reqKeys.removeAll(configProps.keySet());
+    reqKeys.removeAll(configProps.stringPropertyNames());
     if (!reqKeys.isEmpty()) {
       Collections.sort(reqKeys);
       throw new IllegalArgumentException("missing required properties: " + reqKeys);
     }
   }
 
-  public String getURL() {
+  String getURL() {
     return this.url;
   }
 
-  public String getUser() {
+  String getUser() {
     return this.user;
   }
 
-  public String getPassword() {
+  String getPassword() {
     return this.password;
   }
 
-  public String getValueClassName(String regionName) {
-    if (this.regionToClassMap == null) {
-      return this.valueClassNameDefault;
-    }
-    String result = this.regionToClassMap.get(regionName);
-    if (result == null) {
-      result = this.valueClassNameDefault;
-    }
-    return result;
+  String getValueClassName(String regionName) {
+    return regionToClassMap.get(regionName);
   }
 
-  public boolean getIsKeyPartOfValue(String regionName) {
-    if (this.keyPartOfValueMap == null) {
-      return this.keyPartOfValueDefault;
-    }
+  boolean getIsKeyPartOfValue(String regionName) {
     Boolean result = this.keyPartOfValueMap.get(regionName);
-    if (result == null) {
-      return this.keyPartOfValueDefault;
-    }
-    return result;
+    return result != null ? result : DEFAULT_KEY_IN_VALUE;
   }
 
-  protected String getjdbcSeparator() {
+  String getjdbcSeparator() {
     return JDBC_SEPARATOR;
   }
 
-  public String getTableForRegion(String regionName) {
+  String getTableForRegion(String regionName) {
     if (this.regionToTableMap == null) {
       return regionName;
     }
@@ -294,7 +172,7 @@ public class JDBCConfiguration {
     return result;
   }
 
-  public String getColumnForRegionField(String regionName, String fieldName) {
+  String getColumnForRegionField(String regionName, String fieldName) {
     if (this.fieldToColumnMap == null) {
       return fieldName;
     }
@@ -309,4 +187,40 @@ public class JDBCConfiguration {
     }
     return result;
   }
+
+  public static class RegionField {
+    private final String regionName; // may be null
+    private final String fieldName;
+
+    public RegionField(String regionName, String fieldName) {
+      this.regionName = regionName;
+      this.fieldName = fieldName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      RegionField that = (RegionField) o;
+
+      if (regionName != null ? !regionName.equals(that.regionName) : that.regionName != null) {
+        return false;
+      }
+      return fieldName.equals(that.fieldName);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = regionName != null ? regionName.hashCode() : 0;
+      result = 31 * result + fieldName.hashCode();
+      return result;
+    }
+  }
+
 }
+
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
index b228924..e1d7cad 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
@@ -286,29 +286,31 @@ public class JDBCManager {
     }
 
     @Override
-    public int hashCode() {
-      return operation.hashCode() + pdxTypeId + tableName.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
+    public boolean equals(Object o) {
+      if (this == o) {
         return true;
       }
-      if (obj == null) {
-        return false;
-      }
-      StatementKey other = (StatementKey) obj;
-      if (!operation.equals(other.operation)) {
+      if (o == null || getClass() != o.getClass()) {
         return false;
       }
-      if (pdxTypeId != other.pdxTypeId) {
+
+      StatementKey that = (StatementKey) o;
+
+      if (pdxTypeId != that.pdxTypeId) {
         return false;
       }
-      if (!tableName.equals(other.tableName)) {
+      if (operation != null ? !operation.equals(that.operation) : that.operation != null) {
         return false;
       }
-      return true;
+      return tableName != null ? tableName.equals(that.tableName) : that.tableName == null;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = pdxTypeId;
+      result = 31 * result + (operation != null ? operation.hashCode() : 0);
+      result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+      return result;
     }
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java
new file mode 100644
index 0000000..167a079
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java
@@ -0,0 +1,47 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ *  * agreements. See the NOTICE file distributed with this work for additional information regarding
+ *  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance with the License. You may obtain a
+ *  * copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software distributed under the License
+ *  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ *  * or implied. See the License for the specific language governing permissions and limitations under
+ *  * the License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+
+class JDBCPropertyParser {
+  private final Properties properties;
+
+  private static final String PROPERTY_PREFIX_SEPARATOR = "-";
+  private static final String COMPOUND_VALUE_SEPARATOR = ":";
+
+
+  JDBCPropertyParser(Properties properties) {
+    this.properties = properties;
+  }
+
+  <V> Map<String, V> getPropertiesMap(String propertyPrefix, Function<String, V> valueParser) {
+    Map<String, V> map = new HashMap<>();
+    for(String propertyName : properties.stringPropertyNames()) {
+      if (propertyName.startsWith(propertyPrefix)) {
+        V value = valueParser.apply(properties.getProperty(propertyName));
+        int prefixEnd = propertyName.indexOf(PROPERTY_PREFIX_SEPARATOR);
+        String key = propertyName.substring(prefixEnd + 1);
+        map.put(key, value);
+      }
+    }
+    return map;
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
index 2eb2804..8e557f5 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
@@ -31,16 +31,6 @@ public class JDBCConfigurationUnitTest {
   public ExpectedException expectedException = ExpectedException.none();
 
   @Test
-  public void testInvalidProperty() {
-    Properties props = new Properties();
-    props.setProperty("invalid", "");
-
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("unknown properties: [invalid]");
-    new JDBCConfiguration(props);
-  }
-
-  @Test
   public void testMissingAllRequiredProperties() {
     Properties props = new Properties();
     expectedException.expect(IllegalArgumentException.class);
@@ -99,27 +89,12 @@ public class JDBCConfigurationUnitTest {
   }
 
   @Test
-  public void testValueClassName() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("valueClassName", "myPackage.myDomainClass");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getValueClassName("foo")).isEqualTo("myPackage.myDomainClass");
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyThatTwoClassNamesWithNoRegionNameThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("valueClassName", "myClass1, myClass2");
-    new JDBCConfiguration(props);
-  }
-
-  @Test
   public void testValueClassNameWithRegionNames() {
     Properties props = new Properties();
     props.setProperty("url", "");
-    props.setProperty("valueClassName", "reg1:cn1   , reg2:pack2.cn2,myPackage.myDomainClass");
+    props.setProperty("valueClassName-reg1", "cn1");
+    props.setProperty("valueClassName-reg2", "pack2.cn2");
+    props.setProperty("valueClassName-foo", "myPackage.myDomainClass");
     JDBCConfiguration config = new JDBCConfiguration(props);
     assertThat(config.getValueClassName("foo")).isEqualTo("myPackage.myDomainClass");
     assertThat(config.getValueClassName("reg1")).isEqualTo("cn1");
@@ -134,59 +109,18 @@ public class JDBCConfigurationUnitTest {
     assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(false);
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyThatTwoDefaultsKeyPartOfValueThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("isKeyPartOfValue", "true, reg1:true   , reg2:false, true");
-    new JDBCConfiguration(props);
-  }
-
   @Test
   public void testIsKeyPartOfValueWithRegionNames() {
     Properties props = new Properties();
     props.setProperty("url", "");
-    props.setProperty("isKeyPartOfValue", "true, reg1:true   , reg2:false,");
+    props.setProperty("isKeyPartOfValue-reg1", "true");
+    props.setProperty("isKeyPartOfValue-reg2", "false");
     JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(true);
-    assertThat(config.getIsKeyPartOfValue("reg1")).isEqualTo(true);
-    assertThat(config.getIsKeyPartOfValue("reg2")).isEqualTo(false);
-  }
-
-  @Test
-  public void testIsKeyPartOfValueWithjdbcSeparator() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("isKeyPartOfValue", "true, reg1->true   , reg2->false");
-    JDBCConfiguration config = new TestableJDBCConfiguration(props);
-    assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(true);
-    assertThat(config.getIsKeyPartOfValue("reg1")).isEqualTo(true);
-    assertThat(config.getIsKeyPartOfValue("reg2")).isEqualTo(false);
-  }
-
-
-  @Test
-  public void testIsKeyPartOfValueWithjdbcSeparatorNoDefaultValue() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("isKeyPartOfValue", "reg1->true,reg2->false");
-    JDBCConfiguration config = new TestableJDBCConfiguration(props);
     assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(false);
     assertThat(config.getIsKeyPartOfValue("reg1")).isEqualTo(true);
     assertThat(config.getIsKeyPartOfValue("reg2")).isEqualTo(false);
   }
 
-  public static class TestableJDBCConfiguration extends JDBCConfiguration {
-    public TestableJDBCConfiguration(Properties configProps) {
-      super(configProps);
-    }
-
-    @Override
-    protected String getjdbcSeparator() {
-      return "->";
-    }
-  }
-
   @Test
   public void testDefaultRegionToTableMap() {
     Properties props = new Properties();
@@ -199,37 +133,13 @@ public class JDBCConfigurationUnitTest {
   public void testRegionToTableMap() {
     Properties props = new Properties();
     props.setProperty("url", "");
-    props.setProperty("regionToTable", "reg1:table1");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getTableForRegion("reg1")).isEqualTo("table1");
-  }
-
-  @Test
-  public void testRegionsToTablesMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("regionToTable", "reg1:table1, reg2:table2");
+    props.setProperty("regionToTable-reg1", "table1");
+    props.setProperty("regionToTable-reg2", "table2");
     JDBCConfiguration config = new JDBCConfiguration(props);
     assertThat(config.getTableForRegion("reg1")).isEqualTo("table1");
     assertThat(config.getTableForRegion("reg2")).isEqualTo("table2");
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyRegionToTableThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("regionToTable", "reg1:table1, reg2:table2, reg3");
-    new JDBCConfiguration(props);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyDuplicateRegionToTableThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("regionToTable", "reg1:table1, reg2:table2, reg2:table3");
-    new JDBCConfiguration(props);
-  }
-
   @Test
   public void testDefaultFieldToColumnMap() {
     Properties props = new Properties();
@@ -276,12 +186,4 @@ public class JDBCConfigurationUnitTest {
     props.setProperty("fieldToColumn", "reg1:field1:column1, reg1:field1:column2");
     new JDBCConfiguration(props);
   }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyFieldToColumnRequiresSeparator() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("regionToTable", "reg1:table1, reg2:table2, noSeparator");
-    new JDBCConfiguration(props);
-  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
index 36b401b..0e9d85b 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
@@ -557,7 +557,7 @@ public class JDBCManagerUnitTest {
 
   @Test
   public void verifyReadWithValueClassName() throws SQLException {
-    createManager("valueClassName", regionName + ":myValueClass");
+    createManager("valueClassName-" + regionName, "myValueClass");
     GemFireCacheImpl cache = Fakes.cache();
     PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
     PdxInstance pi = mock(PdxInstance.class);
@@ -586,7 +586,7 @@ public class JDBCManagerUnitTest {
 
   @Test
   public void verifyReadWithKeyPartOfValue() throws SQLException {
-    createManager("isKeyPartOfValue", "true");
+    createManager("isKeyPartOfValue-" + regionName, "true");
     GemFireCacheImpl cache = Fakes.cache();
     PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
     PdxInstance pi = mock(PdxInstance.class);
@@ -616,7 +616,7 @@ public class JDBCManagerUnitTest {
   @Test
   public void verifyRegionToTable() throws SQLException {
     String tableName = "mySqlTable";
-    createManagerWithTableName(tableName, "regionToTable", regionName + ":" + tableName);
+    createManagerWithTableName(tableName, "regionToTable-" + regionName, tableName);
     GemFireCacheImpl cache = Fakes.cache();
     PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
     PdxInstance pi = mock(PdxInstance.class);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java
new file mode 100644
index 0000000..ccdb2e0
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ *  * agreements. See the NOTICE file distributed with this work for additional information regarding
+ *  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance with the License. You may obtain a
+ *  * copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software distributed under the License
+ *  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ *  * or implied. See the License for the specific language governing permissions and limitations under
+ *  * the License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+
+public class JDBCPropertyParserTest {
+  @Test
+  public void returnsEmptyMapIfNoPropertiesPresent() {
+    Properties props = new Properties();
+    JDBCPropertyParser parser = new JDBCPropertyParser(props);
+    Map<String, String> map = parser.getPropertiesMap("test", v -> v);
+    assertThat(map).isEmpty();
+  }
+
+  @Test
+  public void returnsMapWithKeyValuePair() {
+    Properties props = new Properties();
+    props.setProperty("name-key", "value");
+    JDBCPropertyParser parser = new JDBCPropertyParser(props);
+    Map<String, String> map = parser.getPropertiesMap("name", v -> v);
+    assertThat(map).hasSize(1).containsKey("key").containsValue("value");
+  }
+
+  @Test
+  public void returnsMapWithMultipleKeyValuePairs() {
+    Properties props = new Properties();
+    props.setProperty("name-key1", "value1");
+    props.setProperty("name-key2", "value2");
+    JDBCPropertyParser parser = new JDBCPropertyParser(props);
+    Map<String, String> map = parser.getPropertiesMap("name", v -> v);
+    assertThat(map).hasSize(2).containsExactly(entry("key1", "value1"), entry("key2", "value2"));
+  }
+
+  @Test
+  public void returnsMapWithBooleanValues() {
+    Properties props = new Properties();
+    props.setProperty("name-key1", "true");
+    props.setProperty("name-key2", "false");
+    JDBCPropertyParser parser = new JDBCPropertyParser(props);
+    Map<String, Boolean> map = parser.getPropertiesMap("name", Boolean::parseBoolean);
+    assertThat(map).hasSize(2).containsExactly(entry("key1", true), entry("key2", false));
+  }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.

[geode] 02/02: Rearchitect JDBC connector for easier testing Add new system for handling configuration Expand test coverage for connector

Posted by nr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nreich pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c5325c118e5a5c944cdd0490e6988e8bbd9f0873
Author: Nick Reich <nr...@pivotal.io>
AuthorDate: Fri Nov 10 14:59:33 2017 -0800

    Rearchitect JDBC connector for easier testing
    Add new system for handling configuration
    Expand test coverage for connector
---
 .../geode/connectors/jdbc/JDBCAsyncWriter.java     |   19 +-
 .../apache/geode/connectors/jdbc/JDBCLoader.java   |   18 +-
 .../connectors/jdbc/JDBCSynchronousWriter.java     |   21 +-
 .../connectors/jdbc/internal/ColumnValue.java      |   14 +-
 .../jdbc/internal/JDBCConfiguration.java           |  226 ----
 .../jdbc/internal/JDBCConfigurationService.java    |   40 +
 .../jdbc/internal/JDBCConnectionConfiguration.java |   54 +
 .../connectors/jdbc/internal/JDBCManager.java      |  381 ++-----
 .../jdbc/internal/JDBCPropertyParser.java          |   47 -
 .../jdbc/internal/JDBCRegionMapping.java           |   75 ++
 .../jdbc/internal/PreparedStatementCache.java      |  103 ++
 .../geode/connectors/jdbc/internal/SQLHandler.java |  182 ++++
 .../jdbc/internal/SqlStatementFactory.java         |   77 ++
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       |   11 +-
 .../connectors/jdbc/JDBCLoaderIntegrationTest.java |    7 +-
 .../jdbc/JDBCSynchronousWriterIntegrationTest.java |    8 +-
 .../connectors/jdbc/internal/ColumnValueTest.java  |   52 +
 .../internal/JDBCConfigurationServiceTest.java     |   73 ++
 .../jdbc/internal/JDBCConfigurationUnitTest.java   |  189 ----
 .../internal/JDBCConnectionConfigurationTest.java  |   58 +
 .../jdbc/internal/JDBCManagerUnitTest.java         | 1127 +++++++++-----------
 .../jdbc/internal/JDBCPropertyParserTest.java      |   64 --
 .../jdbc/internal/JDBCRegionMappingTest.java       |   80 ++
 .../jdbc/internal/PreparedStatementCacheTest.java  |   79 ++
 .../connectors/jdbc/internal/SQLHandlerTest.java   |  361 +++++++
 .../jdbc/internal/SqlStatementFactoryTest.java     |   76 ++
 .../jdbc/internal/TestConfigService.java           |   45 +
 27 files changed, 2004 insertions(+), 1483 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
index 771aec1..1050e92 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
@@ -18,13 +18,11 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.geode.CopyHelper;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.SerializedCacheValue;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
 import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.SQLHandler;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.logging.log4j.Logger;
@@ -39,6 +37,13 @@ public class JDBCAsyncWriter implements AsyncEventListener {
   private long totalEvents = 0;
   private long successfulEvents = 0;
   private JDBCManager manager;
+  private SQLHandler sqlHandler;
+
+  // Constructor for test purposes only
+  JDBCAsyncWriter(JDBCManager manager) {
+    this.manager = manager;
+    sqlHandler = new SQLHandler(manager);
+  }
 
   @Override
   public void close() {
@@ -69,7 +74,7 @@ public class JDBCAsyncWriter implements AsyncEventListener {
     try {
       for (AsyncEvent event : events) {
         try {
-          this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+          sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
               getPdxInstance(event));
           changeSuccessfulEvents(1);
         } catch (RuntimeException ex) {
@@ -85,8 +90,10 @@ public class JDBCAsyncWriter implements AsyncEventListener {
 
   @Override
   public void init(Properties props) {
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    this.manager = new JDBCManager(config);
+    /*
+     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
+     * JDBCManager(config);
+     */
   }
 
   private synchronized void changeTotalEvents(long delta) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java
index c6247bd..085272c 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCLoader.java
@@ -15,13 +15,13 @@
 package org.apache.geode.connectors.jdbc;
 
 import org.apache.geode.cache.LoaderHelper;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
 import org.apache.geode.connectors.jdbc.internal.JDBCManager;
 
 import java.util.Properties;
 
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.connectors.jdbc.internal.SQLHandler;
 
 /*
  * This class provides loading from a data source using JDBC.
@@ -30,6 +30,13 @@ import org.apache.geode.cache.CacheLoaderException;
  */
 public class JDBCLoader<K, V> implements CacheLoader<K, V> {
   private JDBCManager manager;
+  private SQLHandler sqlHandler;
+
+  // Constructor for test purposes only
+  JDBCLoader(JDBCManager manager) {
+    this.manager = manager;
+    sqlHandler = new SQLHandler(manager);
+  }
 
   @Override
   public void close() {
@@ -47,11 +54,14 @@ public class JDBCLoader<K, V> implements CacheLoader<K, V> {
   public V load(LoaderHelper<K, V> helper) throws CacheLoaderException {
     // The following cast to V is to keep the compiler happy
     // but is erased at runtime and no actual cast happens.
-    return (V) this.manager.read(helper.getRegion(), helper.getKey());
+    return (V) sqlHandler.read(helper.getRegion(), helper.getKey());
   }
 
   public void init(Properties props) {
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    this.manager = new JDBCManager(config);
+    /*
+     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
+     * JDBCManager(config);
+     */
+    // TODO: make this get the ConfigurationService?
   };
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java
index ad741f0..a77c95b 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriter.java
@@ -23,8 +23,8 @@ import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.cache.SerializedCacheValue;
 import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
 import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.SQLHandler;
 import org.apache.geode.pdx.PdxInstance;
 
 /*
@@ -34,11 +34,20 @@ import org.apache.geode.pdx.PdxInstance;
  */
 public class JDBCSynchronousWriter<K, V> implements CacheWriter<K, V> {
   private JDBCManager manager;
+  private SQLHandler sqlHandler;
+
+  // Constructor for test purposes only
+  JDBCSynchronousWriter(JDBCManager manager) {
+    this.manager = manager;
+    sqlHandler = new SQLHandler(manager);
+  }
 
   @Override
   public void init(Properties props) {
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    this.manager = new JDBCManager(config);
+    /*
+     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
+     * JDBCManager(config);
+     */
   }
 
   @Override
@@ -69,19 +78,19 @@ public class JDBCSynchronousWriter<K, V> implements CacheWriter<K, V> {
 
   @Override
   public void beforeUpdate(EntryEvent<K, V> event) throws CacheWriterException {
-    this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
   @Override
   public void beforeCreate(EntryEvent<K, V> event) throws CacheWriterException {
-    this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
   @Override
   public void beforeDestroy(EntryEvent<K, V> event) throws CacheWriterException {
-    this.manager.write(event.getRegion(), event.getOperation(), event.getKey(),
+    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
index 1e7803e..8ade14b 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
@@ -19,22 +19,22 @@ public class ColumnValue {
   final private String columnName;
   final private Object value;
 
-  public ColumnValue(boolean isKey, String columnName, Object value) {
+  ColumnValue(boolean isKey, String columnName, Object value) {
     this.isKey = isKey;
     this.columnName = columnName;
     this.value = value;
   }
 
-  public boolean isKey() {
-    return this.isKey;
+  boolean isKey() {
+    return isKey;
   }
 
-  public String getColumnName() {
-    return this.columnName;
+  String getColumnName() {
+    return columnName;
   }
 
-  public Object getValue() {
-    return this.value;
+  Object getValue() {
+    return value;
   }
 
   @Override
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
deleted file mode 100644
index 9e14112..0000000
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfiguration.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Function;
-
-public class JDBCConfiguration {
-  private static final boolean DEFAULT_KEY_IN_VALUE = false; //TODO: determine what default is
-  private static final String URL = "url";
-  private static final String USER = "user";
-  private static final String PASSWORD = "password";
-  private static final String JDBC_SEPARATOR = System.getProperty("jdbcSeparator", ":");
-  /**
-   * syntax: comma separated list of booleanSpecs. booleanSpec: optional regionSpec followed by
-   * boolean. regionSpec: regionName followed by a jdbcSeparator. A 'boolean' is parsed by
-   * {@link Boolean#parseBoolean(String)}. Whitespace is only allowed around the commas. At most one
-   * classSpec without a regionSpec is allowed. A classSpec without a regionSpec defines the
-   * default. Only used by JDBCLoader.
-   */
-  private static final String IS_KEY_PART_OF_VALUE = "isKeyPartOfValue";
-
-  /**
-   * syntax: comma separated list of classSpecs. classSpec: optional regionSpec followed by
-   * className. regionSpec: regionName followed by a jdbcSeparator. Whitespace is only allowed
-   * around the commas. At most one classSpec without a regionSpec is allowed. A classSpec without a
-   * regionSpec defines the default. Only used by JDBCLoader.
-   */
-  private static final String VALUE_CLASS_NAME = "valueClassName";
-
-  /**
-   * syntax: comma separated list of regionTableSpecs. regionTableSpecs: regionName followed by
-   * jdbcSeparator followed by tableName. Whitespace is only allowed around the commas.
-   */
-  private static final String REGION_TO_TABLE = "regionToTable";
-
-  /**
-   * syntax: comma separated list of fieldColumnSpecs. fieldColumnSpecs: Optional regionSpec
-   * followed by fieldName followed by jdbcSeparator followed by columnName. regionSpec: regionName
-   * followed by jdbcSeparator. Whitespace is only allowed around the commas.
-   */
-  private static final String FIELD_TO_COLUMN = "fieldToColumn";
-
-  private static final List<String> requiredProperties =
-      Collections.unmodifiableList(Arrays.asList(URL));
-
-  private final String url;
-  private final String user;
-  private final String password;
-  private final Map<String, String> regionToClassMap;
-  private final Map<String, Boolean> keyPartOfValueMap;
-  private final Map<String, String> regionToTableMap;
-  private final Map<RegionField, String> fieldToColumnMap;
-
-  public JDBCConfiguration(Properties configProps) {
-    validateRequiredProperties(configProps);
-    this.url = configProps.getProperty(URL);
-    this.user = configProps.getProperty(USER);
-    this.password = configProps.getProperty(PASSWORD);
-    JDBCPropertyParser parser = new JDBCPropertyParser(configProps);
-    this.regionToClassMap = parser.getPropertiesMap(VALUE_CLASS_NAME, v -> v);
-    this.keyPartOfValueMap = parser.getPropertiesMap(IS_KEY_PART_OF_VALUE, Boolean::parseBoolean);
-    this.regionToTableMap = parser.getPropertiesMap(REGION_TO_TABLE, v -> v);
-    this.fieldToColumnMap = computeFieldToColumnMap(configProps.getProperty(FIELD_TO_COLUMN));
-  }
-
-  private Map<RegionField, String> computeFieldToColumnMap(String prop) {
-    Function<String, RegionField> regionFieldParser = new Function<String, RegionField>() {
-      @Override
-      public RegionField apply(String item) {
-        String regionName = null;
-        String fieldName;
-        int idx = item.indexOf(getjdbcSeparator());
-        if (idx != -1) {
-          regionName = item.substring(0, idx);
-          fieldName = item.substring(idx + getjdbcSeparator().length());
-        } else {
-          fieldName = item;
-        }
-        return new RegionField(regionName, fieldName);
-      }
-    };
-    return parseMap(prop, regionFieldParser, v -> v, true);
-  }
-
-  private <K, V> Map<K, V> parseMap(String propertyValue, Function<String, K> keyParser,
-      Function<String, V> valueParser, boolean failOnNoSeparator) {
-    if (propertyValue == null) {
-      return null;
-    }
-    Map<K, V> result = new HashMap<>();
-    List<String> items = Arrays.asList(propertyValue.split("\\s*,\\s*"));
-    for (String item : items) {
-      int idx = item.lastIndexOf(getjdbcSeparator());
-      if (idx == -1) {
-        if (failOnNoSeparator) {
-          throw new IllegalArgumentException(item + " does not contain " + getjdbcSeparator());
-        }
-        continue;
-      }
-      String keyString = item.substring(0, idx);
-      String valueString = item.substring(idx + getjdbcSeparator().length());
-      K key = keyParser.apply(keyString);
-      if (result.containsKey(key)) {
-        throw new IllegalArgumentException("Duplicate item " + key + " is not allowed.");
-      }
-      result.put(key, valueParser.apply(valueString));
-    }
-    return result;
-  }
-
-  private void validateRequiredProperties(Properties configProps) {
-    List<String> reqKeys = new ArrayList<>(requiredProperties);
-    reqKeys.removeAll(configProps.stringPropertyNames());
-    if (!reqKeys.isEmpty()) {
-      Collections.sort(reqKeys);
-      throw new IllegalArgumentException("missing required properties: " + reqKeys);
-    }
-  }
-
-  String getURL() {
-    return this.url;
-  }
-
-  String getUser() {
-    return this.user;
-  }
-
-  String getPassword() {
-    return this.password;
-  }
-
-  String getValueClassName(String regionName) {
-    return regionToClassMap.get(regionName);
-  }
-
-  boolean getIsKeyPartOfValue(String regionName) {
-    Boolean result = this.keyPartOfValueMap.get(regionName);
-    return result != null ? result : DEFAULT_KEY_IN_VALUE;
-  }
-
-  String getjdbcSeparator() {
-    return JDBC_SEPARATOR;
-  }
-
-  String getTableForRegion(String regionName) {
-    if (this.regionToTableMap == null) {
-      return regionName;
-    }
-    String result = this.regionToTableMap.get(regionName);
-    if (result == null) {
-      result = regionName;
-    }
-    return result;
-  }
-
-  String getColumnForRegionField(String regionName, String fieldName) {
-    if (this.fieldToColumnMap == null) {
-      return fieldName;
-    }
-    RegionField key = new RegionField(regionName, fieldName);
-    String result = this.fieldToColumnMap.get(key);
-    if (result == null) {
-      key = new RegionField(null, fieldName);
-      result = this.fieldToColumnMap.get(key);
-      if (result == null) {
-        result = regionName;
-      }
-    }
-    return result;
-  }
-
-  public static class RegionField {
-    private final String regionName; // may be null
-    private final String fieldName;
-
-    public RegionField(String regionName, String fieldName) {
-      this.regionName = regionName;
-      this.fieldName = fieldName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      RegionField that = (RegionField) o;
-
-      if (regionName != null ? !regionName.equals(that.regionName) : that.regionName != null) {
-        return false;
-      }
-      return fieldName.equals(that.fieldName);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = regionName != null ? regionName.hashCode() : 0;
-      result = 31 * result + fieldName.hashCode();
-      return result;
-    }
-  }
-
-}
-
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationService.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationService.java
new file mode 100644
index 0000000..0509566
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationService.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class JDBCConfigurationService {
+
+  private final Map<String, JDBCConnectionConfiguration> connectionsByName =
+      new ConcurrentHashMap<>();
+  private final Map<String, JDBCRegionMapping> mappingsByRegion = new ConcurrentHashMap<>();
+
+  JDBCConnectionConfiguration getConnectionConfig(String connectionName) {
+    return connectionsByName.get(connectionName);
+  }
+
+  JDBCRegionMapping getMappingForRegion(String regionName) {
+    return mappingsByRegion.get(regionName);
+  }
+
+  void addOrUpdateConnectionConfig(JDBCConnectionConfiguration config) {
+    connectionsByName.put(config.getName(), config);
+  }
+
+  void addOrUpdateRegionMapping(JDBCRegionMapping mapping) {
+    mappingsByRegion.put(mapping.getRegionName(), mapping);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfiguration.java
new file mode 100644
index 0000000..c75aefe
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+class JDBCConnectionConfiguration {
+
+  private String name;
+  private String url;
+  private String user;
+  private String password;
+
+  String getName() {
+    return name;
+  }
+
+  void setName(String name) {
+    this.name = name;
+  }
+
+  String getUrl() {
+    return url;
+  }
+
+  void setUrl(String url) {
+    this.url = url;
+  }
+
+  String getUser() {
+    return user;
+  }
+
+  void setUser(String user) {
+    this.user = user;
+  }
+
+  String getPassword() {
+    return password;
+  }
+
+  void setPassword(String password) {
+    this.password = password;
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
index e1d7cad..be82e22 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCManager.java
@@ -23,233 +23,34 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxInstanceFactory;
-import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 public class JDBCManager {
 
-  private final JDBCConfiguration config;
+  private final JDBCConfigurationService configService;
 
-  private Connection conn;
+  private Map<String, Connection> connectionMap = new ConcurrentHashMap<>();
 
   private final ConcurrentMap<String, String> tableToPrimaryKeyMap = new ConcurrentHashMap<>();
 
-  private final ThreadLocal<Map<StatementKey, PreparedStatement>> preparedStatementCache =
-      new ThreadLocal<>();
+  private final ThreadLocal<PreparedStatementCache> preparedStatementCache = new ThreadLocal<>();
 
-  private Map<StatementKey, PreparedStatement> getPreparedStatementCache() {
-    Map<StatementKey, PreparedStatement> result = preparedStatementCache.get();
-    if (result == null) {
-      result = new HashMap<>();
-      preparedStatementCache.set(result);
-    }
-    return result;
-  }
-
-  public JDBCManager(JDBCConfiguration config) {
-    this.config = config;
-  }
-
-  private String getRegionName(Region r) {
-    return r.getName();
-  }
-
-  public PdxInstance read(Region region, Object key) {
-    final String regionName = getRegionName(region);
-    final String tableName = getTableName(regionName);
-    List<ColumnValue> columnList =
-        getColumnToValueList(regionName, tableName, key, null, Operation.GET);
-    PreparedStatement pstmt = getPreparedStatement(columnList, tableName, Operation.GET, 0);
-    synchronized (pstmt) {
-      try {
-        int idx = 0;
-        for (ColumnValue cv : columnList) {
-          idx++;
-          pstmt.setObject(idx, cv.getValue());
-        }
-        ResultSet rs = pstmt.executeQuery();
-        if (rs.next()) {
-          InternalCache cache = (InternalCache) region.getRegionService();
-          String valueClassName = getValueClassName(regionName);
-          PdxInstanceFactory factory;
-          if (valueClassName != null) {
-            factory = cache.createPdxInstanceFactory(valueClassName);
-          } else {
-            factory = cache.createPdxInstanceFactory("no class", false);
-          }
-          ResultSetMetaData rsmd = rs.getMetaData();
-          int ColumnsNumber = rsmd.getColumnCount();
-          String keyColumnName = getKeyColumnName(tableName);
-          for (int i = 1; i <= ColumnsNumber; i++) {
-            Object columnValue = rs.getObject(i);
-            String columnName = rsmd.getColumnName(i);
-            String fieldName = mapColumnNameToFieldName(columnName, tableName);
-            if (!isFieldExcluded(fieldName)
-                && (isKeyPartOfValue(regionName) || !keyColumnName.equalsIgnoreCase(columnName))) {
-              factory.writeField(fieldName, columnValue, Object.class);
-            }
-          }
-          if (rs.next()) {
-            throw new IllegalStateException(
-                "Multiple rows returned for key " + key + " on table " + tableName);
-          }
-          return factory.create();
-        } else {
-          return null;
-        }
-      } catch (SQLException e) {
-        handleSQLException(e);
-        return null; // this is never reached
-      } finally {
-        clearStatementParameters(pstmt);
-      }
-    }
-  }
-
-  private String getValueClassName(String regionName) {
-    return this.config.getValueClassName(regionName);
-  }
-
-  public void write(Region region, Operation operation, Object key, PdxInstance value) {
-    final String regionName = getRegionName(region);
-    final String tableName = getTableName(regionName);
-    int pdxTypeId = 0;
-    if (value != null) {
-      pdxTypeId = ((PdxInstanceImpl) value).getPdxType().getTypeId();
-    }
-    List<ColumnValue> columnList =
-        getColumnToValueList(regionName, tableName, key, value, operation);
-    int updateCount = executeWrite(columnList, tableName, operation, pdxTypeId, false);
-    if (operation.isDestroy()) {
-      // TODO: should we check updateCount here? Probably not. It is possible we have nothing in the
-      // table to destroy.
-      return;
-    }
-    if (updateCount <= 0) {
-      Operation upsertOp;
-      if (operation.isUpdate()) {
-        upsertOp = Operation.CREATE;
-      } else {
-        upsertOp = Operation.UPDATE;
-      }
-      updateCount = executeWrite(columnList, tableName, upsertOp, pdxTypeId, true);
-    }
-    if (updateCount != 1) {
-      throw new IllegalStateException("Unexpected updateCount " + updateCount);
-    }
+  public JDBCManager(JDBCConfigurationService configService) {
+    this.configService = configService;
   }
 
-  private int executeWrite(List<ColumnValue> columnList, String tableName, Operation operation,
-      int pdxTypeId, boolean handleException) {
-    PreparedStatement pstmt = getPreparedStatement(columnList, tableName, operation, pdxTypeId);
-    synchronized (pstmt) {
-      try {
-        int idx = 0;
-        for (ColumnValue cv : columnList) {
-          idx++;
-          pstmt.setObject(idx, cv.getValue());
-        }
-        pstmt.execute();
-        return pstmt.getUpdateCount();
-      } catch (SQLException e) {
-        if (handleException || operation.isDestroy()) {
-          handleSQLException(e);
-        }
-        return 0;
-      } finally {
-        clearStatementParameters(pstmt);
-      }
-    }
+  JDBCRegionMapping getMappingForRegion(String regionName) {
+    return configService.getMappingForRegion(regionName);
   }
 
-  private void clearStatementParameters(PreparedStatement ps) {
-    try {
-      ps.clearParameters();
-    } catch (SQLException ignore) {
-    }
-  }
-
-  private String getSqlString(String tableName, List<ColumnValue> columnList, Operation operation) {
-    if (operation.isCreate()) {
-      return getInsertSqlString(tableName, columnList);
-    } else if (operation.isUpdate()) {
-      return getUpdateSqlString(tableName, columnList);
-    } else if (operation.isDestroy()) {
-      return getDestroySqlString(tableName, columnList);
-    } else if (operation.isGet()) {
-      return getSelectQueryString(tableName, columnList);
-    } else {
-      throw new IllegalStateException("unsupported operation " + operation);
-    }
-  }
-
-  private String getSelectQueryString(String tableName, List<ColumnValue> columnList) {
-    assert columnList.size() == 1;
-    ColumnValue keyCV = columnList.get(0);
-    assert keyCV.isKey();
-    StringBuilder query = new StringBuilder(
-        "SELECT * FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?");
-    return query.toString();
-  }
-
-  private String getDestroySqlString(String tableName, List<ColumnValue> columnList) {
-    assert columnList.size() == 1;
-    ColumnValue keyCV = columnList.get(0);
-    assert keyCV.isKey();
-    StringBuilder query =
-        new StringBuilder("DELETE FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?");
-    return query.toString();
-  }
-
-  private String getUpdateSqlString(String tableName, List<ColumnValue> columnList) {
-    StringBuilder query = new StringBuilder("UPDATE " + tableName + " SET ");
-    int idx = 0;
-    for (ColumnValue cv : columnList) {
-      if (cv.isKey()) {
-        query.append(" WHERE ");
-      } else {
-        idx++;
-        if (idx > 1) {
-          query.append(", ");
-        }
-      }
-      query.append(cv.getColumnName());
-      query.append(" = ?");
-    }
-    return query.toString();
-  }
-
-  private String getInsertSqlString(String tableName, List<ColumnValue> columnList) {
-    StringBuilder columnNames = new StringBuilder("INSERT INTO " + tableName + '(');
-    StringBuilder columnValues = new StringBuilder(" VALUES (");
-    int columnCount = columnList.size();
-    int idx = 0;
-    for (ColumnValue cv : columnList) {
-      idx++;
-      columnNames.append(cv.getColumnName());
-      columnValues.append('?');
-      if (idx != columnCount) {
-        columnNames.append(", ");
-        columnValues.append(",");
-      }
-    }
-    columnNames.append(")");
-    columnValues.append(")");
-    return columnNames.append(columnValues).toString();
-  }
-
-  Connection getConnection(String user, String password) {
-    Connection result = this.conn;
+  Connection getConnection(JDBCConnectionConfiguration config) {
+    Connection result = connectionMap.get(config.getName());
     try {
       if (result != null && !result.isClosed()) {
         return result;
@@ -257,84 +58,26 @@ public class JDBCManager {
     } catch (SQLException ignore) {
       // If isClosed throws fall through and connect again
     }
-
+    // TODO: make thread safe by synchronizing on map of connections
     try {
-      result =
-          createConnection(this.config.getURL(), this.config.getUser(), this.config.getPassword());
+      result = getSQLConnection(config);
     } catch (SQLException e) {
       // TODO: consider a different exception
-      throw new IllegalStateException("Could not connect to " + this.config.getURL(), e);
+      throw new IllegalStateException("Could not connect to " + config.getUrl(), e);
     }
-    this.conn = result;
+    connectionMap.put(config.getName(), result);
     return result;
   }
 
-  protected Connection createConnection(String url, String user, String password)
-      throws SQLException {
-    return DriverManager.getConnection(url);
+  // package protected for testing purposes only
+  Connection getSQLConnection(JDBCConnectionConfiguration config) throws SQLException {
+    return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
   }
 
-  private static class StatementKey {
-    private final int pdxTypeId;
-    private final Operation operation;
-    private final String tableName;
-
-    public StatementKey(int pdxTypeId, Operation operation, String tableName) {
-      this.pdxTypeId = pdxTypeId;
-      this.operation = operation;
-      this.tableName = tableName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      StatementKey that = (StatementKey) o;
-
-      if (pdxTypeId != that.pdxTypeId) {
-        return false;
-      }
-      if (operation != null ? !operation.equals(that.operation) : that.operation != null) {
-        return false;
-      }
-      return tableName != null ? tableName.equals(that.tableName) : that.tableName == null;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = pdxTypeId;
-      result = 31 * result + (operation != null ? operation.hashCode() : 0);
-      result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
-      return result;
-    }
-  }
-
-  private PreparedStatement getPreparedStatement(List<ColumnValue> columnList, String tableName,
-      Operation operation, int pdxTypeId) {
-    System.out.println("getPreparedStatement : " + pdxTypeId + "operation: " + operation
-        + " columns: " + columnList);
-    StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
-    return getPreparedStatementCache().computeIfAbsent(key, k -> {
-      String sqlStr = getSqlString(tableName, columnList, operation);
-      System.out.println("sql=" + sqlStr); // TODO remove debugging
-      Connection con = getConnection(null, null);
-      try {
-        return con.prepareStatement(sqlStr);
-      } catch (SQLException e) {
-        handleSQLException(e);
-        return null; // this line is never reached
-      }
-    });
-  }
-
-  private List<ColumnValue> getColumnToValueList(String regionName, String tableName, Object key,
-      PdxInstance value, Operation operation) {
-    String keyColumnName = getKeyColumnName(tableName);
+  // TODO write unit tests
+  List<ColumnValue> getColumnToValueList(JDBCConnectionConfiguration config,
+      JDBCRegionMapping regionMapping, Object key, PdxInstance value, Operation operation) {
+    String keyColumnName = getKeyColumnName(config, regionMapping.getTableName());
     ColumnValue keyCV = new ColumnValue(true, keyColumnName, key);
     if (operation.isDestroy() || operation.isGet()) {
       return Collections.singletonList(keyCV);
@@ -343,55 +86,40 @@ public class JDBCManager {
     List<String> fieldNames = value.getFieldNames();
     List<ColumnValue> result = new ArrayList<>(fieldNames.size() + 1);
     for (String fieldName : fieldNames) {
-      if (isFieldExcluded(fieldName)) {
-        continue;
-      }
-      String columnName = mapFieldNameToColumnName(regionName, fieldName);
+      String columnName = regionMapping.getColumnNameForField(fieldName);
       if (columnName.equalsIgnoreCase(keyColumnName)) {
         continue;
       }
       Object columnValue = value.getField(fieldName);
       ColumnValue cv = new ColumnValue(false, columnName, columnValue);
-      // TODO: any need to order the items in the list?
       result.add(cv);
     }
     result.add(keyCV);
     return result;
   }
 
-  private boolean isFieldExcluded(String fieldName) {
-    // TODO check configuration
-    return false;
-  }
-
-  private String mapFieldNameToColumnName(String regionName, String fieldName) {
-    return this.config.getColumnForRegionField(regionName, fieldName);
-  }
-
   private String mapColumnNameToFieldName(String columnName, String tableName) {
     // TODO check config for mapping
     return columnName.toLowerCase();
   }
 
-  private boolean isKeyPartOfValue(String regionName) {
-    return this.config.getIsKeyPartOfValue(regionName);
-  }
-
-  private String getKeyColumnName(String tableName) {
+  String getKeyColumnName(JDBCConnectionConfiguration connectionConfig, String tableName) {
     return tableToPrimaryKeyMap.computeIfAbsent(tableName, k -> {
-      return computeKeyColumnName(k);
+      return computeKeyColumnName(connectionConfig, k);
     });
   }
 
-  String computeKeyColumnName(String tableName) {
+  private String computeKeyColumnName(JDBCConnectionConfiguration connectionConfig,
+      String tableName) {
     // TODO: check config for key column
-    Connection con = getConnection(null, null);
+    Connection connection = getConnection(connectionConfig);
+    String key;
     try {
-      DatabaseMetaData metaData = con.getMetaData();
-      ResultSet tablesRS = metaData.getTables(null, null, "%", null);
+      DatabaseMetaData metaData = connection.getMetaData();
+      ResultSet tables = metaData.getTables(null, null, "%", null);
       String realTableName = null;
-      while (tablesRS.next()) {
-        String name = tablesRS.getString("TABLE_NAME");
+      while (tables.next()) {
+        String name = tables.getString("TABLE_NAME");
         if (name.equalsIgnoreCase(tableName)) {
           if (realTableName != null) {
             throw new IllegalStateException("Duplicate tables that match region name");
@@ -407,39 +135,35 @@ public class JDBCManager {
         throw new IllegalStateException(
             "The table " + tableName + " does not have a primary key column.");
       }
-      String key = primaryKeys.getString("COLUMN_NAME");
+      key = primaryKeys.getString("COLUMN_NAME");
       if (primaryKeys.next()) {
         throw new IllegalStateException(
             "The table " + tableName + " has more than one primary key column.");
       }
-      return key;
     } catch (SQLException e) {
       handleSQLException(e);
-      return null; // never reached
+      key = null; // never reached
     }
+    return key;
   }
 
   private void handleSQLException(SQLException e) {
     throw new IllegalStateException("NYI: handleSQLException", e);
   }
 
-  private String getTableName(String regionName) {
-    return config.getTableForRegion(regionName);
-  }
-
-  private void printResultSet(ResultSet rs) {
+  private void printResultSet(ResultSet resultSets) {
     System.out.println("Printing ResultSet:");
     try {
       int size = 0;
-      ResultSetMetaData rsmd = rs.getMetaData();
-      int columnsNumber = rsmd.getColumnCount();
-      while (rs.next()) {
+      ResultSetMetaData metaData = resultSets.getMetaData();
+      int columnsNumber = metaData.getColumnCount();
+      while (resultSets.next()) {
         size++;
         for (int i = 1; i <= columnsNumber; i++) {
           if (i > 1)
             System.out.print(",  ");
-          String columnValue = rs.getString(i);
-          System.out.print(rsmd.getColumnName(i) + ": " + columnValue);
+          String columnValue = resultSets.getString(i);
+          System.out.print(metaData.getColumnName(i) + ": " + columnValue);
         }
         System.out.println("");
       }
@@ -448,7 +172,7 @@ public class JDBCManager {
       System.out.println("Exception while printing result set" + ex);
     } finally {
       try {
-        rs.beforeFirst();
+        resultSets.beforeFirst();
       } catch (SQLException e) {
         System.out.println("Exception while calling beforeFirst" + e);
       }
@@ -456,11 +180,30 @@ public class JDBCManager {
   }
 
   public void close() {
-    if (this.conn != null) {
+    connectionMap.values().forEach(connection -> close(connection));
+  }
+
+  private void close(Connection connection) {
+    if (connection != null) {
       try {
-        this.conn.close();
+        connection.close();
       } catch (SQLException ignore) {
       }
     }
   }
+
+  JDBCConnectionConfiguration getConnectionConfig(String connectionConfigName) {
+    return configService.getConnectionConfig(connectionConfigName);
+  }
+
+  PreparedStatement getPreparedStatement(Connection connection, List<ColumnValue> columnList,
+      String tableName, Operation operation, int pdxTypeId) {
+    PreparedStatementCache statementCache = preparedStatementCache.get();
+    if (statementCache == null) {
+      statementCache = new PreparedStatementCache();
+      preparedStatementCache.set(statementCache);
+    }
+    return statementCache.getPreparedStatement(connection, columnList, tableName, operation,
+        pdxTypeId);
+  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java
deleted file mode 100644
index 167a079..0000000
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParser.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- *  * agreements. See the NOTICE file distributed with this work for additional information regarding
- *  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance with the License. You may obtain a
- *  * copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software distributed under the License
- *  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- *  * or implied. See the License for the specific language governing permissions and limitations under
- *  * the License.
- *
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Function;
-
-class JDBCPropertyParser {
-  private final Properties properties;
-
-  private static final String PROPERTY_PREFIX_SEPARATOR = "-";
-  private static final String COMPOUND_VALUE_SEPARATOR = ":";
-
-
-  JDBCPropertyParser(Properties properties) {
-    this.properties = properties;
-  }
-
-  <V> Map<String, V> getPropertiesMap(String propertyPrefix, Function<String, V> valueParser) {
-    Map<String, V> map = new HashMap<>();
-    for(String propertyName : properties.stringPropertyNames()) {
-      if (propertyName.startsWith(propertyPrefix)) {
-        V value = valueParser.apply(properties.getProperty(propertyName));
-        int prefixEnd = propertyName.indexOf(PROPERTY_PREFIX_SEPARATOR);
-        String key = propertyName.substring(prefixEnd + 1);
-        map.put(key, value);
-      }
-    }
-    return map;
-  }
-}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMapping.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMapping.java
new file mode 100644
index 0000000..ba8fe6f
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMapping.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class JDBCRegionMapping {
+  private String regionName;
+  private String pdxClassName;
+  private String tableName;
+  private String connectionConfigName;
+  private boolean primaryKeyInValue;
+  private Map<String, String> fieldToColumnMap = new HashMap<>();
+
+  void setConnectionConfigName(String connectionConfigName) {
+    this.connectionConfigName = connectionConfigName;
+  }
+
+  String getConnectionConfigName() {
+    return connectionConfigName;
+  }
+
+  String getRegionName() {
+    return regionName;
+  }
+
+  void setRegionName(String regionName) {
+    this.regionName = regionName;
+  }
+
+  String getPdxClassName() {
+    return pdxClassName;
+  }
+
+  void setPdxClassName(String pdxClassName) {
+    this.pdxClassName = pdxClassName;
+  }
+
+  String getTableName() {
+    return tableName;
+  }
+
+  void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  boolean isPrimaryKeyInValue() {
+    return primaryKeyInValue;
+  }
+
+  void setPrimaryKeyInValue(String primaryKeyInValue) {
+    this.primaryKeyInValue = Boolean.parseBoolean(primaryKeyInValue);
+  }
+
+  String getColumnNameForField(String fieldName) {
+    String columnName = fieldToColumnMap.get(fieldName);
+    return columnName != null ? columnName : fieldName;
+  }
+
+  void addFieldToColumnMapping(String fieldName, String columnMapping) {
+    fieldToColumnMap.put(fieldName, columnMapping);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java
new file mode 100644
index 0000000..7758a7e
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.geode.cache.Operation;
+
+class PreparedStatementCache {
+
+  private SqlStatementFactory statementFactory = new SqlStatementFactory();
+  // TODO: if connection lost, we will still keep the statement. Make LRU?
+  private Map<StatementKey, PreparedStatement> statements = new HashMap<>();
+
+  PreparedStatement getPreparedStatement(Connection connection, List<ColumnValue> columnList,
+      String tableName, Operation operation, int pdxTypeId) {
+    StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
+    return statements.computeIfAbsent(key, k -> {
+      String sqlStr = getSqlString(tableName, columnList, operation);
+      PreparedStatement statement = null;
+      try {
+        statement = connection.prepareStatement(sqlStr);
+      } catch (SQLException e) {
+        handleSQLException(e);
+      }
+      return statement;
+    });
+  }
+
+  private String getSqlString(String tableName, List<ColumnValue> columnList, Operation operation) {
+    if (operation.isCreate()) {
+      return statementFactory.createInsertSqlString(tableName, columnList);
+    } else if (operation.isUpdate()) {
+      return statementFactory.createUpdateSqlString(tableName, columnList);
+    } else if (operation.isDestroy()) {
+      return statementFactory.createDestroySqlString(tableName, columnList);
+    } else if (operation.isGet()) {
+      return statementFactory.createSelectQueryString(tableName, columnList);
+    } else {
+      throw new IllegalArgumentException("unsupported operation " + operation);
+    }
+  }
+
+  private void handleSQLException(SQLException e) {
+    throw new IllegalStateException("NYI: handleSQLException", e);
+  }
+
+  private static class StatementKey {
+    private final int pdxTypeId;
+    private final Operation operation;
+    private final String tableName;
+
+    StatementKey(int pdxTypeId, Operation operation, String tableName) {
+      this.pdxTypeId = pdxTypeId;
+      this.operation = operation;
+      this.tableName = tableName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      StatementKey that = (StatementKey) o;
+
+      if (pdxTypeId != that.pdxTypeId) {
+        return false;
+      }
+      if (operation != null ? !operation.equals(that.operation) : that.operation != null) {
+        return false;
+      }
+      return tableName != null ? tableName.equals(that.tableName) : that.tableName == null;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = pdxTypeId;
+      result = 31 * result + (operation != null ? operation.hashCode() : 0);
+      result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+      return result;
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SQLHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SQLHandler.java
new file mode 100644
index 0000000..a214968
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SQLHandler.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
+
+public class SQLHandler {
+  private JDBCManager manager;
+
+  public SQLHandler(JDBCManager manager) {
+    this.manager = manager;
+  }
+
+  public PdxInstance read(Region region, Object key) {
+    if (key == null) {
+      throw new IllegalArgumentException("Key for query cannot be null");
+    }
+
+    JDBCRegionMapping regionMapping = manager.getMappingForRegion(region.getName());
+    JDBCConnectionConfiguration connectionConfig =
+        manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+
+    List<ColumnValue> columnList =
+        manager.getColumnToValueList(connectionConfig, regionMapping, key, null, Operation.GET);
+    String tableName = regionMapping.getTableName();
+    PreparedStatement statement = manager.getPreparedStatement(
+        manager.getConnection(connectionConfig), columnList, tableName, Operation.GET, 0);
+    PdxInstanceFactory factory = getPdxInstanceFactory(region, regionMapping);
+    String keyColumnName = manager.getKeyColumnName(connectionConfig, tableName);
+    return executeReadStatement(statement, columnList, factory, regionMapping, keyColumnName);
+  }
+
+  private PdxInstanceFactory getPdxInstanceFactory(Region region, JDBCRegionMapping regionMapping) {
+    InternalCache cache = (InternalCache) region.getRegionService();
+    String valueClassName = regionMapping.getPdxClassName();
+    PdxInstanceFactory factory;
+    if (valueClassName != null) {
+      factory = cache.createPdxInstanceFactory(valueClassName);
+    } else {
+      factory = cache.createPdxInstanceFactory("no class", false);
+    }
+    return factory;
+  }
+
+  private PdxInstance executeReadStatement(PreparedStatement statement,
+      List<ColumnValue> columnList, PdxInstanceFactory factory, JDBCRegionMapping regionMapping,
+      String keyColumnName) {
+    PdxInstance pdxInstance = null;
+    synchronized (statement) {
+      try {
+        int idx = 0;
+        for (ColumnValue columnValue : columnList) {
+          idx++;
+          statement.setObject(idx, columnValue.getValue());
+        }
+        ResultSet resultSet = statement.executeQuery();
+        if (resultSet.next()) {
+
+          ResultSetMetaData metaData = resultSet.getMetaData();
+          int ColumnsNumber = metaData.getColumnCount();
+          for (int i = 1; i <= ColumnsNumber; i++) {
+            Object columnValue = resultSet.getObject(i);
+            String columnName = metaData.getColumnName(i);
+            String fieldName = mapColumnNameToFieldName(columnName, regionMapping.getTableName());
+            if (regionMapping.isPrimaryKeyInValue()
+                || !keyColumnName.equalsIgnoreCase(columnName)) {
+              factory.writeField(fieldName, columnValue, Object.class);
+            }
+          }
+          if (resultSet.next()) {
+            throw new IllegalStateException(
+                "Multiple rows returned for query: " + resultSet.getStatement().toString());
+          }
+          pdxInstance = factory.create();
+        }
+      } catch (SQLException e) {
+        handleSQLException(e);
+      } finally {
+        clearStatementParameters(statement);
+      }
+    }
+    return pdxInstance;
+  }
+
+  private String mapColumnNameToFieldName(String columnName, String tableName) {
+    // TODO check config for mapping
+    return columnName.toLowerCase();
+  }
+
+  public void write(Region region, Operation operation, Object key, PdxInstance value) {
+    if (value == null && operation != Operation.DESTROY) {
+      throw new IllegalArgumentException("PdxInstance cannot be null for non-destroy operations");
+    }
+    JDBCRegionMapping regionMapping = manager.getMappingForRegion(region.getName());
+    final String tableName = regionMapping.getTableName();
+    JDBCConnectionConfiguration connectionConfig =
+        manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+    List<ColumnValue> columnList =
+        manager.getColumnToValueList(connectionConfig, regionMapping, key, value, operation);
+
+    int pdxTypeId = 0;
+    if (value != null) {
+      pdxTypeId = ((PdxInstanceImpl) value).getPdxType().getTypeId();
+    }
+    PreparedStatement statement = manager.getPreparedStatement(
+        manager.getConnection(connectionConfig), columnList, tableName, operation, pdxTypeId);
+    int updateCount = executeWriteStatement(statement, columnList, operation, false);
+    if (operation.isDestroy()) {
+      // TODO: should we check updateCount here? Probably not. It is possible we have nothing in the
+      // table to destroy.
+      return;
+    }
+    if (updateCount <= 0) {
+      Operation upsertOp;
+      if (operation.isUpdate()) {
+        upsertOp = Operation.CREATE;
+      } else {
+        upsertOp = Operation.UPDATE;
+      }
+      PreparedStatement upsertStatement = manager.getPreparedStatement(
+          manager.getConnection(connectionConfig), columnList, tableName, upsertOp, pdxTypeId);
+      updateCount = executeWriteStatement(upsertStatement, columnList, upsertOp, true);
+    }
+    if (updateCount != 1) {
+      throw new IllegalStateException("Unexpected updateCount " + updateCount);
+    }
+  }
+
+  private int executeWriteStatement(PreparedStatement statement, List<ColumnValue> columnList,
+      Operation operation, boolean handleException) {
+    synchronized (statement) {
+      try {
+        int idx = 0;
+        for (ColumnValue columnValue : columnList) {
+          idx++;
+          statement.setObject(idx, columnValue.getValue());
+        }
+        return statement.executeUpdate();
+      } catch (SQLException e) {
+        if (handleException || operation.isDestroy()) {
+          handleSQLException(e);
+        }
+        return 0;
+      } finally {
+        clearStatementParameters(statement);
+      }
+    }
+  }
+
+  private void clearStatementParameters(PreparedStatement statement) {
+    try {
+      statement.clearParameters();
+    } catch (SQLException ignore) {
+    }
+  }
+
+  private void handleSQLException(SQLException e) {
+    throw new IllegalStateException("NYI: handleSQLException", e);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
new file mode 100644
index 0000000..1b0da56
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.List;
+
+class SqlStatementFactory {
+
+  String createSelectQueryString(String tableName, List<ColumnValue> columnList) {
+    assert columnList.size() == 1;
+    ColumnValue keyCV = columnList.get(0);
+    assert keyCV.isKey();
+    return "SELECT * FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+  }
+
+  String createDestroySqlString(String tableName, List<ColumnValue> columnList) {
+    assert columnList.size() == 1;
+    ColumnValue keyCV = columnList.get(0);
+    assert keyCV.isKey();
+    return "DELETE FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+  }
+
+  String createUpdateSqlString(String tableName, List<ColumnValue> columnList) {
+    StringBuilder query = new StringBuilder("UPDATE " + tableName + " SET ");
+    int idx = 0;
+    for (ColumnValue column : columnList) {
+      if (!column.isKey()) {
+        idx++;
+        if (idx > 1) {
+          query.append(", ");
+        }
+        query.append(column.getColumnName());
+        query.append(" = ?");
+      }
+    }
+    for (ColumnValue column : columnList) {
+      if (column.isKey()) {
+        query.append(" WHERE ");
+        query.append(column.getColumnName());
+        query.append(" = ?");
+        // currently only support simple primary key with one column
+        break;
+      }
+    }
+    return query.toString();
+  }
+
+  String createInsertSqlString(String tableName, List<ColumnValue> columnList) {
+    StringBuilder columnNames = new StringBuilder("INSERT INTO " + tableName + " (");
+    StringBuilder columnValues = new StringBuilder(" VALUES (");
+    int columnCount = columnList.size();
+    int idx = 0;
+    for (ColumnValue column : columnList) {
+      idx++;
+      columnNames.append(column.getColumnName());
+      columnValues.append('?');
+      if (idx != columnCount) {
+        columnNames.append(", ");
+        columnValues.append(",");
+      }
+    }
+    columnNames.append(")");
+    columnValues.append(")");
+    return columnNames.append(columnValues).toString();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
index 966cde5..1a228a6 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -29,6 +29,9 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.connectors.jdbc.JDBCAsyncWriter;
+import org.apache.geode.connectors.jdbc.internal.JDBCConfigurationService;
+import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxReader;
@@ -319,8 +322,8 @@ public class JDBCAsyncWriterIntegrationTest {
   }
 
   private Region createRegionWithJDBCAsyncWriter(String regionName, Properties props) {
-    jdbcWriter = new JDBCAsyncWriter();
-    jdbcWriter.init(props);
+    jdbcWriter = new JDBCAsyncWriter(createManager());
+    // jdbcWriter.init(props);
     cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
         .create("jdbcAsyncQueue", jdbcWriter);
 
@@ -351,4 +354,8 @@ public class JDBCAsyncWriterIntegrationTest {
     }
   }
 
+  private JDBCManager createManager() {
+    return new JDBCManager(TestConfigService.getTestConfigService());
+  }
+
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java
index 52314d0..ac9508c 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCLoaderIntegrationTest.java
@@ -27,6 +27,8 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -90,7 +92,7 @@ public class JDBCLoaderIntegrationTest {
   }
 
   private Region createRegionWithJDBCLoader(String regionName, Properties props) {
-    this.jdbcLoader = new JDBCLoader<>();
+    this.jdbcLoader = new JDBCLoader<>(createManager());
     this.jdbcLoader.init(props);
     RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
     rf.setCacheLoader(jdbcLoader);
@@ -137,4 +139,7 @@ public class JDBCLoaderIntegrationTest {
     }
   }
 
+  private JDBCManager createManager() {
+    return new JDBCManager(TestConfigService.getTestConfigService());
+  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java
index fbdce89..a8162eb 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCSynchronousWriterIntegrationTest.java
@@ -33,6 +33,8 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.JDBCManager;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxReader;
@@ -318,7 +320,7 @@ public class JDBCSynchronousWriterIntegrationTest {
   }
 
   private Region createRegionWithJDBCSynchronousWriter(String regionName, Properties props) {
-    jdbcWriter = new JDBCSynchronousWriter();
+    jdbcWriter = new JDBCSynchronousWriter(createManager());
     jdbcWriter.init(props);
 
     RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
@@ -348,4 +350,8 @@ public class JDBCSynchronousWriterIntegrationTest {
     }
   }
 
+  private JDBCManager createManager() {
+    return new JDBCManager(TestConfigService.getTestConfigService());
+  }
+
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
new file mode 100644
index 0000000..6a75199
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ColumnValueTest {
+  private static final String COLUMN_NAME = "columnName";
+  private static final Object VALUE = new Object();
+
+  private ColumnValue value;
+
+  @Before
+  public void setup() {
+    value = new ColumnValue(true, COLUMN_NAME, VALUE);
+  }
+
+  @Test
+  public void isKeyReturnsCorrectValue() {
+    assertThat(value.isKey()).isTrue();
+  }
+
+  @Test
+  public void hasCorrectColumnName() {
+    assertThat(value.getColumnName()).isEqualTo(COLUMN_NAME);
+  }
+
+  @Test
+  public void hasCorrectValue() {
+    assertThat(value.getValue()).isSameAs(VALUE);
+  }
+
+  @Test
+  public void toStringContainsAllVariables() {
+    assertThat(value.toString()).contains(Boolean.toString(true)).contains(COLUMN_NAME)
+        .contains(VALUE.toString());
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationServiceTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationServiceTest.java
new file mode 100644
index 0000000..ca1df04
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationServiceTest.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+public class JDBCConfigurationServiceTest {
+  private static final String TEST_CONFIG_NAME = "testConfig";
+  private static final String TEST_REGION_NAME = "testRegion";
+
+  private JDBCConfigurationService service = new JDBCConfigurationService();
+
+  @Test
+  public void returnsNoConfigIfEmpty() {
+    assertThat(service.getConnectionConfig("foo")).isNull();
+  }
+
+  @Test
+  public void returnsNoMappingIfEmpty() {
+    assertThat(service.getMappingForRegion("foo")).isNull();
+  }
+
+  @Test
+  public void returnsCorrectConfig() {
+    JDBCConnectionConfiguration config = mock(JDBCConnectionConfiguration.class);
+    when(config.getName()).thenReturn(TEST_CONFIG_NAME);
+    service.addOrUpdateConnectionConfig(config);
+
+    assertThat(service.getConnectionConfig(TEST_CONFIG_NAME)).isSameAs(config);
+  }
+
+  @Test
+  public void doesNotReturnConfigWithDifferentName() {
+    JDBCConnectionConfiguration config = mock(JDBCConnectionConfiguration.class);
+    when(config.getName()).thenReturn("theOtherConfig");
+    service.addOrUpdateConnectionConfig(config);
+
+    assertThat(service.getConnectionConfig(TEST_CONFIG_NAME)).isNull();
+  }
+
+  @Test
+  public void returnsCorrectMapping() {
+    JDBCRegionMapping mapping = mock(JDBCRegionMapping.class);
+    when(mapping.getRegionName()).thenReturn(TEST_REGION_NAME);
+    service.addOrUpdateRegionMapping(mapping);
+
+    assertThat(service.getMappingForRegion(TEST_REGION_NAME)).isSameAs(mapping);
+  }
+
+  @Test
+  public void doesNotReturnMappingForDifferentRegion() {
+    JDBCRegionMapping mapping = mock(JDBCRegionMapping.class);
+    when(mapping.getRegionName()).thenReturn("theOtherMapping");
+    service.addOrUpdateRegionMapping(mapping);
+
+    assertThat(service.getMappingForRegion(TEST_REGION_NAME)).isNull();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
deleted file mode 100644
index 8e557f5..0000000
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConfigurationUnitTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import static org.assertj.core.api.Assertions.*;
-
-import java.util.Properties;
-
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
-import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.*;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-
-@Category(UnitTest.class)
-public class JDBCConfigurationUnitTest {
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testMissingAllRequiredProperties() {
-    Properties props = new Properties();
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("missing required properties: [url]");
-    new JDBCConfiguration(props);
-  }
-
-  @Test
-  public void testURLProperty() {
-    Properties props = new Properties();
-    props.setProperty("url", "myUrl");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getURL()).isEqualTo("myUrl");
-  }
-
-  @Test
-  public void testDefaultUser() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getUser()).isNull();
-  }
-
-  @Test
-  public void testDefaultPassword() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getPassword()).isNull();
-  }
-
-  @Test
-  public void testUser() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("user", "myUser");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getUser()).isEqualTo("myUser");
-  }
-
-  @Test
-  public void testPassword() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("password", "myPassword");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getPassword()).isEqualTo("myPassword");
-  }
-
-  @Test
-  public void testDefaultValueClassName() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getValueClassName("foo")).isNull();
-  }
-
-  @Test
-  public void testValueClassNameWithRegionNames() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("valueClassName-reg1", "cn1");
-    props.setProperty("valueClassName-reg2", "pack2.cn2");
-    props.setProperty("valueClassName-foo", "myPackage.myDomainClass");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getValueClassName("foo")).isEqualTo("myPackage.myDomainClass");
-    assertThat(config.getValueClassName("reg1")).isEqualTo("cn1");
-    assertThat(config.getValueClassName("reg2")).isEqualTo("pack2.cn2");
-  }
-
-  @Test
-  public void testDefaultIsKeyPartOfValue() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(false);
-  }
-
-  @Test
-  public void testIsKeyPartOfValueWithRegionNames() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("isKeyPartOfValue-reg1", "true");
-    props.setProperty("isKeyPartOfValue-reg2", "false");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getIsKeyPartOfValue("foo")).isEqualTo(false);
-    assertThat(config.getIsKeyPartOfValue("reg1")).isEqualTo(true);
-    assertThat(config.getIsKeyPartOfValue("reg2")).isEqualTo(false);
-  }
-
-  @Test
-  public void testDefaultRegionToTableMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getTableForRegion("foo")).isEqualTo("foo");
-  }
-
-  @Test
-  public void testRegionToTableMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("regionToTable-reg1", "table1");
-    props.setProperty("regionToTable-reg2", "table2");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getTableForRegion("reg1")).isEqualTo("table1");
-    assertThat(config.getTableForRegion("reg2")).isEqualTo("table2");
-  }
-
-  @Test
-  public void testDefaultFieldToColumnMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getColumnForRegionField("reg1", "field1")).isEqualTo("field1");
-  }
-
-  @Test
-  public void testFieldToColumnMap() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn", "field1:column1");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getColumnForRegionField("reg1", "field1")).isEqualTo("column1");
-  }
-
-  @Test
-  public void testFieldToColumnMapWithMoreThanOne() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn",
-        "reg0:field2:othercolumn2, reg1:field1:column1, field2:column2, reg3:field1:othercolumn1");
-    JDBCConfiguration config = new JDBCConfiguration(props);
-    assertThat(config.getColumnForRegionField("reg1", "field1")).isEqualTo("column1");
-    assertThat(config.getColumnForRegionField("reg3", "field1")).isEqualTo("othercolumn1");
-    assertThat(config.getColumnForRegionField("reg0", "field2")).isEqualTo("othercolumn2");
-    assertThat(config.getColumnForRegionField("regAny", "field2")).isEqualTo("column2");
-    assertThat(config.getColumnForRegionField("regOther", "field2")).isEqualTo("column2");
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyDuplicateFieldThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn", "field1:column1, field1:column2");
-    new JDBCConfiguration(props);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void verifyDuplicateRegionFieldThrows() {
-    Properties props = new Properties();
-    props.setProperty("url", "");
-    props.setProperty("fieldToColumn", "reg1:field1:column1, reg1:field1:column2");
-    new JDBCConfiguration(props);
-  }
-}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfigurationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfigurationTest.java
new file mode 100644
index 0000000..186ba13
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCConnectionConfigurationTest.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class JDBCConnectionConfigurationTest {
+  private JDBCConnectionConfiguration config = new JDBCConnectionConfiguration();
+
+  @Test
+  public void initiatedWithNullValues() {
+    assertThat(config.getName()).isNull();
+    assertThat(config.getUrl()).isNull();
+    assertThat(config.getUser()).isNull();
+    assertThat(config.getPassword()).isNull();
+  }
+
+  @Test
+  public void hasCorrectName() {
+    String name = "name";
+    config.setName(name);
+    assertThat(config.getName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectUrl() {
+    String url = "url";
+    config.setUrl(url);
+    assertThat(config.getUrl()).isEqualTo(url);
+  }
+
+  @Test
+  public void hasCorrectUser() {
+    String user = "user";
+    config.setUser(user);
+    assertThat(config.getUser()).isEqualTo(user);
+  }
+
+  @Test
+  public void hasCorrectPassword() {
+    String password = "password";
+    config.setPassword(password);
+    assertThat(config.getPassword()).isEqualTo(password);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
index 0e9d85b..f9a81e1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCManagerUnitTest.java
@@ -14,652 +14,557 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.*;
-import static com.googlecode.catchexception.CatchException.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
 
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.ArgumentCaptor;
 
-import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.Region;
-import org.apache.geode.connectors.jdbc.internal.JDBCConfiguration;
-import org.apache.geode.connectors.jdbc.internal.JDBCManager;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxInstanceFactory;
-import org.apache.geode.pdx.internal.PdxInstanceImpl;
-import org.apache.geode.pdx.internal.PdxType;
-import org.apache.geode.test.fake.Fakes;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class JDBCManagerUnitTest {
 
-  private JDBCManager mgr;
-  private String regionName = "jdbcRegion";
-  Connection connection;
-  PreparedStatement preparedStatement;
-  PreparedStatement preparedStatement2;
-
-  private static final String ID_COLUMN_NAME = "ID";
-  private static final String NAME_COLUMN_NAME = "name";
-  private static final String AGE_COLUMN_NAME = "age";
-
-  public class TestableUpsertJDBCManager extends JDBCManager {
-
-    final int upsertReturn;
-
-    TestableUpsertJDBCManager(JDBCConfiguration config) {
-      super(config);
-      upsertReturn = 1;
-    }
-
-    TestableUpsertJDBCManager(JDBCConfiguration config, int upsertReturn) {
-      super(config);
-      this.upsertReturn = upsertReturn;
-    }
-
-    @Override
-    protected Connection createConnection(String url, String user, String password)
-        throws SQLException {
-      ResultSet rsKeys = mock(ResultSet.class);
-      when(rsKeys.next()).thenReturn(true, false);
-      when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
-
-      ResultSet rs = mock(ResultSet.class);
-      when(rs.next()).thenReturn(true, false);
-      when(rs.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
-
-      DatabaseMetaData metaData = mock(DatabaseMetaData.class);
-      when(metaData.getPrimaryKeys(null, null, regionName.toUpperCase())).thenReturn(rsKeys);
-      when(metaData.getTables(any(), any(), any(), any())).thenReturn(rs);
-
-      preparedStatement = mock(PreparedStatement.class);
-      preparedStatement2 = mock(PreparedStatement.class);
-      when(preparedStatement.getUpdateCount()).thenReturn(0);
-      when(preparedStatement2.getUpdateCount()).thenReturn(this.upsertReturn);
-
-      connection = mock(Connection.class);
-      when(connection.getMetaData()).thenReturn(metaData);
-      when(connection.prepareStatement(any())).thenReturn(preparedStatement, preparedStatement2);
-
-      return connection;
-    }
-  }
-
-  public class TestableJDBCManagerWithResultSets extends JDBCManager {
-
-    private ResultSet tableResults;
-    private ResultSet primaryKeyResults;
-    private ResultSet queryResultSet;
-    private String tableName;
-
-    TestableJDBCManagerWithResultSets(String tableName, JDBCConfiguration config,
-        ResultSet tableResults, ResultSet primaryKeyResults, ResultSet queryResultSet) {
-      super(config);
-      this.tableResults = tableResults;
-      this.primaryKeyResults = primaryKeyResults;
-      this.queryResultSet = queryResultSet;
-      this.tableName = tableName;
-    }
-
-    @Override
-    protected Connection createConnection(String url, String user, String password)
-        throws SQLException {
-      if (primaryKeyResults == null) {
-        primaryKeyResults = mock(ResultSet.class);
-        when(primaryKeyResults.next()).thenReturn(true, false);
-        when(primaryKeyResults.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
-      }
-
-      if (tableResults == null) {
-        tableResults = mock(ResultSet.class);
-        when(tableResults.next()).thenReturn(true, false);
-        when(tableResults.getString("TABLE_NAME")).thenReturn(this.tableName.toUpperCase());
-      }
+  private static final String REGION_NAME = "testRegion";
+  private static final String CONFIG_NAME = "configName";
 
-      DatabaseMetaData metaData = mock(DatabaseMetaData.class);
-      when(metaData.getPrimaryKeys(null, null, this.tableName.toUpperCase()))
-          .thenReturn(primaryKeyResults);
-      when(metaData.getTables(any(), any(), any(), any())).thenReturn(tableResults);
+  private JDBCConfigurationService configService;
+  private JDBCManager manager;
 
-      preparedStatement = mock(PreparedStatement.class);
-      when(preparedStatement.getUpdateCount()).thenReturn(1);
-
-
-      if (this.queryResultSet == null) {
-        queryResultSet = mock(ResultSet.class);
-        ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
-        when(rsmd.getColumnCount()).thenReturn(3);
-        when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
-        when(queryResultSet.getMetaData()).thenReturn(rsmd);
-        when(queryResultSet.next()).thenReturn(true, false);
-        when(queryResultSet.getObject(anyInt())).thenReturn("1", "Emp1", 21);
-      }
-
-      when(preparedStatement.executeQuery()).thenReturn(queryResultSet);
-
-      connection = mock(Connection.class);
-      when(connection.getMetaData()).thenReturn(metaData);
-      when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-
-      return connection;
-    }
-  }
 
   @Before
-  public void setUp() throws Exception {}
-
-  @After
-  public void tearDown() throws Exception {}
-
-  private void createManager(String... args) throws SQLException {
-    createManagerWithTableName(regionName, args);
-  }
-
-  private void createManagerWithTableName(String tableName, String... args) throws SQLException {
-    ResultSet rsKeys = mock(ResultSet.class);
-    when(rsKeys.next()).thenReturn(true, false);
-    when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
-
-    ResultSet rs = mock(ResultSet.class);
-    when(rs.next()).thenReturn(true, false);
-    when(rs.getString("TABLE_NAME")).thenReturn(tableName.toUpperCase());
-
-    this.mgr = new TestableJDBCManagerWithResultSets(tableName, createConfiguration(args), rs,
-        rsKeys, null);
-  }
-
-  private void createDefaultManager() throws SQLException {
-    createManager();
-  }
-
-  private void createUpsertManager() {
-    this.mgr = new TestableUpsertJDBCManager(createConfiguration());
-  }
-
-  private void createUpsertManager(int upsertReturn) {
-    this.mgr = new TestableUpsertJDBCManager(createConfiguration(), upsertReturn);
-  }
-
-  private void createManager(ResultSet tableNames, ResultSet primaryKeys, ResultSet queryResultSet,
-      String... args) {
-    this.mgr = new TestableJDBCManagerWithResultSets(regionName, createConfiguration(args),
-        tableNames, primaryKeys, queryResultSet);
-  }
-
-  private JDBCConfiguration createConfiguration(String... args) {
-    Properties props = new Properties();
-    props.setProperty("url", "fakeURL");
-    String[] argsArray = args;
-    if (argsArray != null) {
-      assert argsArray.length % 2 == 0;
-      for (int i = 0; i < argsArray.length; i += 2) {
-        props.setProperty(argsArray[i], argsArray[i + 1]);
-      }
-    }
-    return new JDBCConfiguration(props);
+  public void setup() {
+    configService = mock(JDBCConfigurationService.class);
+    manager = new JDBCManager(configService);
   }
 
   @Test
-  public void verifySimpleCreateCallsExecute() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
-        + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
+  public void getsCorrectMapping() {
+    manager.getMappingForRegion(REGION_NAME);
+    verify(configService).getMappingForRegion(REGION_NAME);
   }
 
   @Test
-  public void verifySimpleCreateWithIdField() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21, true);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
-        + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
+  public void getsCorrectConnectionConfig() {
+    manager.getConnectionConfig(CONFIG_NAME);
+    verify(configService).getConnectionConfig(CONFIG_NAME);
   }
 
   @Test
-  public void verifySimpleUpdateCallsExecute() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.UPDATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
+  public void retrievesANewConnection() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    Connection returnedConnection = spyManager.getConnection(connectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
   }
 
   @Test
-  public void verifySimpleDestroyCallsExecute() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    this.mgr.write(region, Operation.DESTROY, "1", null);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("DELETE FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
+  public void retrievesSameConnectionForSameConnectionConfig() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    Connection returnedConnection = spyManager.getConnection(connectionConfig);
+    Connection secondReturnedConnection = spyManager.getConnection(connectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
+    assertThat(secondReturnedConnection).isNotNull().isSameAs(connection);
   }
 
   @Test
-  public void verifyTwoCreatesReuseSameStatement() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    PdxInstanceImpl pdx2 = mockPdxInstance("Emp2", 55);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    this.mgr.write(region, Operation.CREATE, "2", pdx2);
-    verify(this.preparedStatement, times(2)).execute();
-    verify(this.connection).prepareStatement(any());
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(6)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    assertThat(allObjects.get(3)).isEqualTo("Emp2");
-    assertThat(allObjects.get(4)).isEqualTo(55);
-    assertThat(allObjects.get(5)).isEqualTo("2");
-  }
-
-  private PdxInstanceImpl mockPdxInstance(String name, int age) {
-    return mockPdxInstance(name, age, false);
-  }
-
-  private PdxInstanceImpl mockPdxInstance(String name, int age, boolean addId) {
-    PdxInstanceImpl pdxInstance = mock(PdxInstanceImpl.class);
-    if (addId) {
-      when(pdxInstance.getFieldNames())
-          .thenReturn(Arrays.asList(NAME_COLUMN_NAME, AGE_COLUMN_NAME, ID_COLUMN_NAME));
-      when(pdxInstance.getField(ID_COLUMN_NAME)).thenReturn("bogusId");
-    } else {
-      when(pdxInstance.getFieldNames())
-          .thenReturn(Arrays.asList(NAME_COLUMN_NAME, AGE_COLUMN_NAME));
-    }
-    when(pdxInstance.getField(NAME_COLUMN_NAME)).thenReturn(name);
-    when(pdxInstance.getField(AGE_COLUMN_NAME)).thenReturn(age);
-    PdxType pdxType = mock(PdxType.class);
-    when(pdxType.getTypeId()).thenReturn(1);
-    when(pdxInstance.getPdxType()).thenReturn(pdxType);
-    return pdxInstance;
+  public void retrievesDifferentConnectionForEachConfig() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    Connection secondConnection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    JDBCConnectionConfiguration secondConnectionConfig =
+        getTestConnectionConfig("newName", "url", null, null);
+
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    doReturn(secondConnection).when(spyManager).getSQLConnection(secondConnectionConfig);
+    Connection returnedConnection = spyManager.getConnection(connectionConfig);
+    Connection secondReturnedConnection = spyManager.getConnection(secondConnectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
+    assertThat(secondReturnedConnection).isNotNull().isSameAs(secondConnection);
+    assertThat(returnedConnection).isNotSameAs(secondReturnedConnection);
   }
 
   @Test
-  public void verifySimpleInvalidateIsUnsupported() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    catchException(this.mgr).write(region, Operation.INVALIDATE, "1", pdx1);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage()).isEqualTo("unsupported operation INVALIDATE");
-  }
+  public void retrivesANewConnectionIfCachedOneIsClosed() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    when(connection.isClosed()).thenReturn(true);
+    Connection secondConnection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    spyManager.getConnection(connectionConfig);
 
-  @Test
-  public void verifyNoTableForRegion() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region("badRegion", cache);
-    catchException(this.mgr).write(region, Operation.DESTROY, "1", null);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("no table was found that matches badRegion");
+    doReturn(secondConnection).when(spyManager).getSQLConnection(connectionConfig);
+    Connection secondReturnedConnection = spyManager.getConnection(connectionConfig);
+    assertThat(secondReturnedConnection).isSameAs(secondConnection);
   }
 
   @Test
-  public void callClose() throws SQLException {
-    createDefaultManager();
-    this.mgr.close();
-  }
-
-  @Test
-  public void callCloseWithConnection() throws SQLException {
-    createDefaultManager();
-    this.mgr.getConnection(null, null);
-    this.mgr.close();
-  }
-
-  @Test
-  public void verifyInsertUpdate() throws SQLException {
-    createUpsertManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    verify(this.preparedStatement2).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture());
-    List<String> allArgs = sqlCaptor.getAllValues();
-    assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME + ", "
-        + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
-    allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
-
-  @Test
-  public void verifyUpdateInsert() throws SQLException {
-    createUpsertManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.UPDATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    verify(this.preparedStatement2).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture());
-    List<String> allArgs = sqlCaptor.getAllValues();
-    assertThat(allArgs.get(0)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    assertThat(allArgs.get(1)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME + ", "
-        + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
-    allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
-
-  @Test
-  public void verifyInsertUpdateThatUpdatesNothing() throws SQLException {
-    createUpsertManager(0);
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    catchException(this.mgr).write(region, Operation.CREATE, "1", pdx1);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage()).isEqualTo("Unexpected updateCount 0");
-    verify(this.preparedStatement).execute();
-    verify(this.preparedStatement2).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture());
-    List<String> allArgs = sqlCaptor.getAllValues();
-    assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME + ", "
-        + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
-        + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-    verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
-    allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
-
-  @Test
-  public void twoTablesOfSameName() throws SQLException {
-    ResultSet primaryKeys = null;
-    ResultSet tables = mock(ResultSet.class);
-    when(tables.next()).thenReturn(true, true, false);
-    when(tables.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
-    createManager(tables, primaryKeys, null);
-    catchException(this.mgr).computeKeyColumnName(regionName);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage()).isEqualTo("Duplicate tables that match region name");
-  }
-
-  @Test
-  public void noPrimaryKeyOnTable() throws SQLException {
-    ResultSet tables = null;
-    ResultSet primaryKeys = mock(ResultSet.class);
-    when(primaryKeys.next()).thenReturn(false);
-    createManager(tables, primaryKeys, null);
-    catchException(this.mgr).computeKeyColumnName(regionName);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("The table " + regionName + " does not have a primary key column.");
-  }
-
-  @Test
-  public void twoPrimaryKeysOnTable() throws SQLException {
-    ResultSet tables = null;
-    ResultSet primaryKeys = mock(ResultSet.class);
-    when(primaryKeys.next()).thenReturn(true, true, false);
-    createManager(tables, primaryKeys, null);
-    catchException(this.mgr).computeKeyColumnName(regionName);
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("The table " + regionName + " has more than one primary key column.");
-  }
-
-  @Test
-  public void verifyReadThatMissesReturnsNull() throws SQLException {
-    ResultSet queryResults = mock(ResultSet.class);
-    when(queryResults.next()).thenReturn(false);
-    createManager(null, null, queryResults);
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    assertThat(this.mgr.read(region, "2")).isNull();
-  }
-
-  @Test
-  public void verifyReadWithMultipleResultsFails() throws SQLException {
-    ResultSet queryResults = mock(ResultSet.class);
-    when(queryResults.next()).thenReturn(true, true, false);
-    ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
-    when(rsmd.getColumnCount()).thenReturn(3);
-    when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
-    when(queryResults.getMetaData()).thenReturn(rsmd);
-    createManager(null, null, queryResults);
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-    Region region = Fakes.region(regionName, cache);
-    catchException(this.mgr).read(region, "1");
-    assertThat((Exception) caughtException()).isInstanceOf(IllegalStateException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("Multiple rows returned for key 1 on table " + regionName);
-  }
-
-  @Test
-  public void verifyReadThatHitsReturnsValue() throws SQLException {
-    createDefaultManager();
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21));
-  }
-
-  @Test
-  public void verifyReadWithValueClassName() throws SQLException {
-    createManager("valueClassName-" + regionName, "myValueClass");
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("myValueClass")).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21));
-  }
-
-  @Test
-  public void verifyReadWithKeyPartOfValue() throws SQLException {
-    createManager("isKeyPartOfValue-" + regionName, "true");
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(3)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("id", "name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("1", "Emp1", 21));
-  }
-
-  @Test
-  public void verifyRegionToTable() throws SQLException {
-    String tableName = "mySqlTable";
-    createManagerWithTableName(tableName, "regionToTable-" + regionName, tableName);
-    GemFireCacheImpl cache = Fakes.cache();
-    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
-    PdxInstance pi = mock(PdxInstance.class);
-    when(factory.create()).thenReturn(pi);
-    when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
-
-    Region region = Fakes.region(regionName, cache);
-    Object key = "1";
-    PdxInstance value = this.mgr.read(region, key);
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue())
-        .isEqualTo("SELECT * FROM " + tableName + " WHERE " + ID_COLUMN_NAME + " = ?");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("1");
-    assertThat(value).isSameAs(pi);
-    ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
-        any());
-    assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
-    assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21));
-  }
-
-  @Test
-  public void verifyFieldToColumn() throws SQLException {
-    createManager("fieldToColumn", "name:columnName, " + regionName + ":age:columnAge");
-    GemFireCacheImpl cache = Fakes.cache();
-    Region region = Fakes.region(regionName, cache);
-    PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
-    this.mgr.write(region, Operation.CREATE, "1", pdx1);
-    verify(this.preparedStatement).execute();
-    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
-    verify(this.connection).prepareStatement(sqlCaptor.capture());
-    assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + "columnName"
-        + ", " + "columnAge" + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
-    ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
-    verify(this.preparedStatement, times(3)).setObject(anyInt(), objectCaptor.capture());
-    List<Object> allObjects = objectCaptor.getAllValues();
-    assertThat(allObjects.get(0)).isEqualTo("Emp1");
-    assertThat(allObjects.get(1)).isEqualTo(21);
-    assertThat(allObjects.get(2)).isEqualTo("1");
-  }
+  public void closesAllConnections() throws Exception {
+    JDBCManager spyManager = spy(manager);
+    Connection connection = mock(Connection.class);
+    Connection secondConnection = mock(Connection.class);
+    JDBCConnectionConfiguration connectionConfig =
+        getTestConnectionConfig("name", "url", null, null);
+    JDBCConnectionConfiguration secondConnectionConfig =
+        getTestConnectionConfig("newName", "url", null, null);
+
+    doReturn(connection).when(spyManager).getSQLConnection(connectionConfig);
+    doReturn(secondConnection).when(spyManager).getSQLConnection(secondConnectionConfig);
+    spyManager.getConnection(connectionConfig);
+    spyManager.getConnection(secondConnectionConfig);
+
+    spyManager.close();
+    verify(connection).close();
+    verify(secondConnection).close();
+  }
+
+  private JDBCConnectionConfiguration getTestConnectionConfig(String name, String url, String user,
+      String password) {
+    JDBCConnectionConfiguration config = new JDBCConnectionConfiguration();
+    config.setName(name);
+    config.setUrl(url);
+    config.setUser(user);
+    config.setPassword(password);
+    return config;
+  }
+
+
+  /*
+   * private JDBCManager mgr; private String regionName = "jdbcRegion"; private
+   * JDBCConfigurationService configService; private Connection connection; private
+   * PreparedStatement preparedStatement; private PreparedStatement preparedStatement2;
+   * 
+   * private static final String ID_COLUMN_NAME = "ID"; private static final String NAME_COLUMN_NAME
+   * = "name"; private static final String AGE_COLUMN_NAME = "age";
+   */
+
+  /*
+   * public class TestableUpsertJDBCManager extends JDBCManager {
+   * 
+   * final int upsertReturn;
+   * 
+   * TestableUpsertJDBCManager(JDBCConfigurationService config) { super(config); upsertReturn = 1; }
+   * 
+   * TestableUpsertJDBCManager(JDBCConfigurationService config, int upsertReturn) { super(config);
+   * this.upsertReturn = upsertReturn; }
+   * 
+   * @Override protected Connection createConnection(String url, String user, String password)
+   * throws SQLException { ResultSet rsKeys = mock(ResultSet.class);
+   * when(rsKeys.next()).thenReturn(true, false);
+   * when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
+   * 
+   * ResultSet rs = mock(ResultSet.class); when(rs.next()).thenReturn(true, false);
+   * when(rs.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
+   * 
+   * DatabaseMetaData metaData = mock(DatabaseMetaData.class); when(metaData.getPrimaryKeys(null,
+   * null, regionName.toUpperCase())).thenReturn(rsKeys); when(metaData.getTables(any(), any(),
+   * any(), any())).thenReturn(rs);
+   * 
+   * preparedStatement = mock(PreparedStatement.class); preparedStatement2 =
+   * mock(PreparedStatement.class); when(preparedStatement.getUpdateCount()).thenReturn(0);
+   * when(preparedStatement2.getUpdateCount()).thenReturn(this.upsertReturn);
+   * 
+   * connection = mock(Connection.class); when(connection.getMetaData()).thenReturn(metaData);
+   * when(connection.prepareStatement(any())).thenReturn(preparedStatement, preparedStatement2);
+   * 
+   * return connection; } }
+   */
+
+  /*
+   * public class TestableJDBCManagerWithResultSets extends JDBCManager {
+   * 
+   * private ResultSet tableResults; private ResultSet primaryKeyResults; private ResultSet
+   * queryResultSet; private String tableName;
+   * 
+   * TestableJDBCManagerWithResultSets(String tableName, JDBCConfigurationService config, ResultSet
+   * tableResults, ResultSet primaryKeyResults, ResultSet queryResultSet) { super(config);
+   * this.tableResults = tableResults; this.primaryKeyResults = primaryKeyResults;
+   * this.queryResultSet = queryResultSet; this.tableName = tableName; }
+   * 
+   * @Override protected Connection createConnection(String url, String user, String password)
+   * throws SQLException { if (primaryKeyResults == null) { primaryKeyResults =
+   * mock(ResultSet.class); when(primaryKeyResults.next()).thenReturn(true, false);
+   * when(primaryKeyResults.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME); }
+   * 
+   * if (tableResults == null) { tableResults = mock(ResultSet.class);
+   * when(tableResults.next()).thenReturn(true, false);
+   * when(tableResults.getString("TABLE_NAME")).thenReturn(this.tableName.toUpperCase()); }
+   * 
+   * DatabaseMetaData metaData = mock(DatabaseMetaData.class); when(metaData.getPrimaryKeys(null,
+   * null, this.tableName.toUpperCase())) .thenReturn(primaryKeyResults);
+   * when(metaData.getTables(any(), any(), any(), any())).thenReturn(tableResults);
+   * 
+   * preparedStatement = mock(PreparedStatement.class);
+   * when(preparedStatement.getUpdateCount()).thenReturn(1);
+   * 
+   * 
+   * if (this.queryResultSet == null) { queryResultSet = mock(ResultSet.class); ResultSetMetaData
+   * rsmd = mock(ResultSetMetaData.class); when(rsmd.getColumnCount()).thenReturn(3);
+   * when(rsmd.getColumnName(anyInt())).thenReturn("ID", "NAME", "AGE");
+   * when(queryResultSet.getMetaData()).thenReturn(rsmd);
+   * when(queryResultSet.next()).thenReturn(true, false);
+   * when(queryResultSet.getObject(anyInt())).thenReturn("1", "Emp1", 21); }
+   * 
+   * when(preparedStatement.executeQuery()).thenReturn(queryResultSet);
+   * 
+   * connection = mock(Connection.class); when(connection.getMetaData()).thenReturn(metaData);
+   * when(connection.prepareStatement(any())).thenReturn(preparedStatement);
+   * 
+   * return connection; } }
+   */
+
+  /*
+   * @Before public void setUp() throws Exception { configService =
+   * mock(JDBCConfigurationService.class); }
+   * 
+   * @After public void tearDown() throws Exception {}
+   * 
+   * private void createManager(String... args) throws SQLException {
+   * createManagerWithTableName(regionName, args); }
+   * 
+   * private void createManagerWithTableName(String tableName, String... args) throws SQLException {
+   * ResultSet rsKeys = mock(ResultSet.class); when(rsKeys.next()).thenReturn(true, false);
+   * when(rsKeys.getString("COLUMN_NAME")).thenReturn(ID_COLUMN_NAME);
+   * 
+   * ResultSet rs = mock(ResultSet.class); when(rs.next()).thenReturn(true, false);
+   * when(rs.getString("TABLE_NAME")).thenReturn(tableName.toUpperCase());
+   * 
+   * this.mgr = new JDBCManager(tableName, createConfiguration(args), rs, rsKeys, null); }
+   * 
+   * private void createDefaultManager() throws SQLException { createManager(); }
+   * 
+   * private void createUpsertManager() { this.mgr = new
+   * TestableUpsertJDBCManager(createConfiguration()); }
+   * 
+   * private void createUpsertManager(int upsertReturn) { this.mgr = new
+   * TestableUpsertJDBCManager(createConfiguration(), upsertReturn); }
+   * 
+   * private void createManager(ResultSet tableNames, ResultSet primaryKeys, ResultSet
+   * queryResultSet, String... args) { this.mgr = new TestableJDBCManagerWithResultSets(regionName,
+   * createConfiguration(args), tableNames, primaryKeys, queryResultSet); }
+   * 
+   * private JDBCConfigurationService createConfiguration(String... args) { Properties props = new
+   * Properties(); props.setProperty("url", "fakeURL"); String[] argsArray = args; if (argsArray !=
+   * null) { assert argsArray.length % 2 == 0; for (int i = 0; i < argsArray.length; i += 2) {
+   * props.setProperty(argsArray[i], argsArray[i + 1]); } } JDBCConfigurationService service = new
+   * JDBCConfigurationService(); JDBCConnectionConfiguration connectionConfig =
+   * service.addOrUpdateConnectionConfig(conne); return service; }
+   * 
+   * @Test public void verifySimpleCreateCallsExecute() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * this.mgr.write(region, Operation.CREATE, "1", pdx1); verify(this.preparedStatement).execute();
+   * ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
+   * + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifySimpleCreateWithIdField() throws SQLException { createDefaultManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region(regionName, cache);
+   * PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21, true); this.mgr.write(region,
+   * Operation.CREATE, "1", pdx1); verify(this.preparedStatement).execute(); ArgumentCaptor<String>
+   * sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME
+   * + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifySimpleUpdateCallsExecute() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * this.mgr.write(region, Operation.UPDATE, "1", pdx1); verify(this.preparedStatement).execute();
+   * ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME
+   * + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifySimpleDestroyCallsExecute() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); this.mgr.write(region, Operation.DESTROY, "1", null);
+   * verify(this.preparedStatement).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("DELETE FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); }
+   * 
+   * @Test public void verifyTwoCreatesReuseSameStatement() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * PdxInstanceImpl pdx2 = mockPdxInstance("Emp2", 55); this.mgr.write(region, Operation.CREATE,
+   * "1", pdx1); this.mgr.write(region, Operation.CREATE, "2", pdx2); verify(this.preparedStatement,
+   * times(2)).execute(); verify(this.connection).prepareStatement(any()); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(6)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * assertThat(allObjects.get(3)).isEqualTo("Emp2"); assertThat(allObjects.get(4)).isEqualTo(55);
+   * assertThat(allObjects.get(5)).isEqualTo("2"); }
+   * 
+   * private PdxInstanceImpl mockPdxInstance(String name, int age) { return mockPdxInstance(name,
+   * age, false); }
+   * 
+   * private PdxInstanceImpl mockPdxInstance(String name, int age, boolean addId) { PdxInstanceImpl
+   * pdxInstance = mock(PdxInstanceImpl.class); if (addId) { when(pdxInstance.getFieldNames())
+   * .thenReturn(Arrays.asList(NAME_COLUMN_NAME, AGE_COLUMN_NAME, ID_COLUMN_NAME));
+   * when(pdxInstance.getField(ID_COLUMN_NAME)).thenReturn("bogusId"); } else {
+   * when(pdxInstance.getFieldNames()) .thenReturn(Arrays.asList(NAME_COLUMN_NAME,
+   * AGE_COLUMN_NAME)); } when(pdxInstance.getField(NAME_COLUMN_NAME)).thenReturn(name);
+   * when(pdxInstance.getField(AGE_COLUMN_NAME)).thenReturn(age); PdxType pdxType =
+   * mock(PdxType.class); when(pdxType.getTypeId()).thenReturn(1);
+   * when(pdxInstance.getPdxType()).thenReturn(pdxType); return pdxInstance; }
+   * 
+   * @Test public void verifySimpleInvalidateIsUnsupported() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * catchException(this.mgr).write(region, Operation.INVALIDATE, "1", pdx1); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()).isEqualTo("unsupported operation INVALIDATE"); }
+   * 
+   * @Test public void verifyNoTableForRegion() throws SQLException { createDefaultManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region("badRegion", cache);
+   * catchException(this.mgr).write(region, Operation.DESTROY, "1", null); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage())
+   * .isEqualTo("no table was found that matches badRegion"); }
+   * 
+   * @Test public void callClose() throws SQLException { createDefaultManager(); this.mgr.close(); }
+   * 
+   * @Test public void callCloseWithConnection() throws SQLException { createDefaultManager();
+   * this.mgr.getConnection(null, null); this.mgr.close(); }
+   * 
+   * @Test public void verifyInsertUpdate() throws SQLException { createUpsertManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region(regionName, cache);
+   * PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21); this.mgr.write(region, Operation.CREATE,
+   * "1", pdx1); verify(this.preparedStatement).execute();
+   * verify(this.preparedStatement2).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class); verify(this.connection,
+   * times(2)).prepareStatement(sqlCaptor.capture()); List<String> allArgs =
+   * sqlCaptor.getAllValues(); assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName +
+   * "(" + NAME_COLUMN_NAME + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
+   * assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME +
+   * " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
+   * allObjects = objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifyUpdateInsert() throws SQLException { createUpsertManager();
+   * GemFireCacheImpl cache = Fakes.cache(); Region region = Fakes.region(regionName, cache);
+   * PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21); this.mgr.write(region, Operation.UPDATE,
+   * "1", pdx1); verify(this.preparedStatement).execute();
+   * verify(this.preparedStatement2).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class); verify(this.connection,
+   * times(2)).prepareStatement(sqlCaptor.capture()); List<String> allArgs =
+   * sqlCaptor.getAllValues(); assertThat(allArgs.get(0)).isEqualTo("UPDATE " + regionName + " SET "
+   * + NAME_COLUMN_NAME + " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?");
+   * assertThat(allArgs.get(1)).isEqualTo("INSERT INTO " + regionName + "(" + NAME_COLUMN_NAME +
+   * ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
+   * allObjects = objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void verifyInsertUpdateThatUpdatesNothing() throws SQLException {
+   * createUpsertManager(0); GemFireCacheImpl cache = Fakes.cache(); Region region =
+   * Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1", 21);
+   * catchException(this.mgr).write(region, Operation.CREATE, "1", pdx1); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()).isEqualTo("Unexpected updateCount 0");
+   * verify(this.preparedStatement).execute(); verify(this.preparedStatement2).execute();
+   * ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+   * verify(this.connection, times(2)).prepareStatement(sqlCaptor.capture()); List<String> allArgs =
+   * sqlCaptor.getAllValues(); assertThat(allArgs.get(0)).isEqualTo("INSERT INTO " + regionName +
+   * "(" + NAME_COLUMN_NAME + ", " + AGE_COLUMN_NAME + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)");
+   * assertThat(allArgs.get(1)).isEqualTo("UPDATE " + regionName + " SET " + NAME_COLUMN_NAME +
+   * " = ?, " + AGE_COLUMN_NAME + " = ? WHERE " + ID_COLUMN_NAME + " = ?"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1");
+   * verify(this.preparedStatement2, times(3)).setObject(anyInt(), objectCaptor.capture());
+   * allObjects = objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   * 
+   * @Test public void twoTablesOfSameName() throws SQLException { ResultSet primaryKeys = null;
+   * ResultSet tables = mock(ResultSet.class); when(tables.next()).thenReturn(true, true, false);
+   * when(tables.getString("TABLE_NAME")).thenReturn(regionName.toUpperCase());
+   * createManager(tables, primaryKeys, null);
+   * catchException(this.mgr).computeKeyColumnName(regionName); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()).isEqualTo("Duplicate tables that match region name"
+   * ); }
+   * 
+   * @Test public void noPrimaryKeyOnTable() throws SQLException { ResultSet tables = null;
+   * ResultSet primaryKeys = mock(ResultSet.class); when(primaryKeys.next()).thenReturn(false);
+   * createManager(tables, primaryKeys, null);
+   * catchException(this.mgr).computeKeyColumnName(regionName); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()) .isEqualTo("The table " + regionName +
+   * " does not have a primary key column."); }
+   * 
+   * @Test public void twoPrimaryKeysOnTable() throws SQLException { ResultSet tables = null;
+   * ResultSet primaryKeys = mock(ResultSet.class); when(primaryKeys.next()).thenReturn(true, true,
+   * false); createManager(tables, primaryKeys, null);
+   * catchException(this.mgr).computeKeyColumnName(regionName); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage()) .isEqualTo("The table " + regionName +
+   * " has more than one primary key column."); }
+   * 
+   * @Test public void verifyReadThatMissesReturnsNull() throws SQLException { ResultSet
+   * queryResults = mock(ResultSet.class); when(queryResults.next()).thenReturn(false);
+   * createManager(null, null, queryResults); GemFireCacheImpl cache = Fakes.cache(); Region region
+   * = Fakes.region(regionName, cache); assertThat(this.mgr.read(region, "2")).isNull(); }
+   * 
+   * @Test public void verifyReadWithMultipleResultsFails() throws SQLException { ResultSet
+   * queryResults = mock(ResultSet.class); when(queryResults.next()).thenReturn(true, true, false);
+   * ResultSetMetaData rsmd = mock(ResultSetMetaData.class);
+   * when(rsmd.getColumnCount()).thenReturn(3); when(rsmd.getColumnName(anyInt())).thenReturn("ID",
+   * "NAME", "AGE"); when(queryResults.getMetaData()).thenReturn(rsmd); createManager(null, null,
+   * queryResults); GemFireCacheImpl cache = Fakes.cache(); PdxInstanceFactory factory =
+   * mock(PdxInstanceFactory.class); PdxInstance pi = mock(PdxInstance.class);
+   * when(factory.create()).thenReturn(pi); when(cache.createPdxInstanceFactory("no class",
+   * false)).thenReturn(factory); Region region = Fakes.region(regionName, cache);
+   * catchException(this.mgr).read(region, "1"); assertThat((Exception)
+   * caughtException()).isInstanceOf(IllegalStateException.class);
+   * assertThat(caughtException().getMessage())
+   * .isEqualTo("Multiple rows returned for key 1 on table " + regionName); }
+   * 
+   * @Test public void verifyReadThatHitsReturnsValue() throws SQLException {
+   * createDefaultManager(); GemFireCacheImpl cache = Fakes.cache(); PdxInstanceFactory factory =
+   * mock(PdxInstanceFactory.class); PdxInstance pi = mock(PdxInstance.class);
+   * when(factory.create()).thenReturn(pi); when(cache.createPdxInstanceFactory("no class",
+   * false)).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
+   * assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21)); }
+   * 
+   * @Test public void verifyReadWithValueClassName() throws SQLException {
+   * createManager("valueClassName-" + regionName, "myValueClass"); GemFireCacheImpl cache =
+   * Fakes.cache(); PdxInstanceFactory factory = mock(PdxInstanceFactory.class); PdxInstance pi =
+   * mock(PdxInstance.class); when(factory.create()).thenReturn(pi);
+   * when(cache.createPdxInstanceFactory("myValueClass")).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
+   * assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21)); }
+   * 
+   * @Test public void verifyReadWithKeyPartOfValue() throws SQLException {
+   * createManager("isKeyPartOfValue-" + regionName, "true"); GemFireCacheImpl cache =
+   * Fakes.cache(); PdxInstanceFactory factory = mock(PdxInstanceFactory.class); PdxInstance pi =
+   * mock(PdxInstance.class); when(factory.create()).thenReturn(pi);
+   * when(cache.createPdxInstanceFactory("no class", false)).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + regionName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(3)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("id", "name",
+   * "age")); assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("1", "Emp1", 21));
+   * }
+   * 
+   * @Test public void verifyRegionToTable() throws SQLException { String tableName = "mySqlTable";
+   * createManagerWithTableName(tableName, "regionToTable-" + regionName, tableName);
+   * GemFireCacheImpl cache = Fakes.cache(); PdxInstanceFactory factory =
+   * mock(PdxInstanceFactory.class); PdxInstance pi = mock(PdxInstance.class);
+   * when(factory.create()).thenReturn(pi); when(cache.createPdxInstanceFactory("no class",
+   * false)).thenReturn(factory);
+   * 
+   * Region region = Fakes.region(regionName, cache); Object key = "1"; PdxInstance value =
+   * this.mgr.read(region, key); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture()); assertThat(sqlCaptor.getValue())
+   * .isEqualTo("SELECT * FROM " + tableName + " WHERE " + ID_COLUMN_NAME + " = ?");
+   * ArgumentCaptor<Object> objectCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(this.preparedStatement, times(1)).setObject(anyInt(), objectCaptor.capture());
+   * List<Object> allObjects = objectCaptor.getAllValues();
+   * assertThat(allObjects.get(0)).isEqualTo("1"); assertThat(value).isSameAs(pi);
+   * ArgumentCaptor<String> fieldNameCaptor = ArgumentCaptor.forClass(String.class);
+   * ArgumentCaptor<Object> fieldValueCaptor = ArgumentCaptor.forClass(Object.class);
+   * verify(factory, times(2)).writeField(fieldNameCaptor.capture(), fieldValueCaptor.capture(),
+   * any()); assertThat(fieldNameCaptor.getAllValues()).isEqualTo(Arrays.asList("name", "age"));
+   * assertThat(fieldValueCaptor.getAllValues()).isEqualTo(Arrays.asList("Emp1", 21)); }
+   * 
+   * @Test public void verifyFieldToColumn() throws SQLException { createManager("fieldToColumn",
+   * "name:columnName, " + regionName + ":age:columnAge"); GemFireCacheImpl cache = Fakes.cache();
+   * Region region = Fakes.region(regionName, cache); PdxInstanceImpl pdx1 = mockPdxInstance("Emp1",
+   * 21); this.mgr.write(region, Operation.CREATE, "1", pdx1);
+   * verify(this.preparedStatement).execute(); ArgumentCaptor<String> sqlCaptor =
+   * ArgumentCaptor.forClass(String.class);
+   * verify(this.connection).prepareStatement(sqlCaptor.capture());
+   * assertThat(sqlCaptor.getValue()).isEqualTo("INSERT INTO " + regionName + "(" + "columnName" +
+   * ", " + "columnAge" + ", " + ID_COLUMN_NAME + ") VALUES (?,?,?)"); ArgumentCaptor<Object>
+   * objectCaptor = ArgumentCaptor.forClass(Object.class); verify(this.preparedStatement,
+   * times(3)).setObject(anyInt(), objectCaptor.capture()); List<Object> allObjects =
+   * objectCaptor.getAllValues(); assertThat(allObjects.get(0)).isEqualTo("Emp1");
+   * assertThat(allObjects.get(1)).isEqualTo(21); assertThat(allObjects.get(2)).isEqualTo("1"); }
+   */
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java
deleted file mode 100644
index ccdb2e0..0000000
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCPropertyParserTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- *  * agreements. See the NOTICE file distributed with this work for additional information regarding
- *  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance with the License. You may obtain a
- *  * copy of the License at
- *  *
- *  * http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software distributed under the License
- *  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- *  * or implied. See the License for the specific language governing permissions and limitations under
- *  * the License.
- *
- */
-package org.apache.geode.connectors.jdbc.internal;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.junit.Test;
-
-public class JDBCPropertyParserTest {
-  @Test
-  public void returnsEmptyMapIfNoPropertiesPresent() {
-    Properties props = new Properties();
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, String> map = parser.getPropertiesMap("test", v -> v);
-    assertThat(map).isEmpty();
-  }
-
-  @Test
-  public void returnsMapWithKeyValuePair() {
-    Properties props = new Properties();
-    props.setProperty("name-key", "value");
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, String> map = parser.getPropertiesMap("name", v -> v);
-    assertThat(map).hasSize(1).containsKey("key").containsValue("value");
-  }
-
-  @Test
-  public void returnsMapWithMultipleKeyValuePairs() {
-    Properties props = new Properties();
-    props.setProperty("name-key1", "value1");
-    props.setProperty("name-key2", "value2");
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, String> map = parser.getPropertiesMap("name", v -> v);
-    assertThat(map).hasSize(2).containsExactly(entry("key1", "value1"), entry("key2", "value2"));
-  }
-
-  @Test
-  public void returnsMapWithBooleanValues() {
-    Properties props = new Properties();
-    props.setProperty("name-key1", "true");
-    props.setProperty("name-key2", "false");
-    JDBCPropertyParser parser = new JDBCPropertyParser(props);
-    Map<String, Boolean> map = parser.getPropertiesMap("name", Boolean::parseBoolean);
-    assertThat(map).hasSize(2).containsExactly(entry("key1", true), entry("key2", false));
-  }
-}
\ No newline at end of file
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMappingTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMappingTest.java
new file mode 100644
index 0000000..e9a3ca7
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JDBCRegionMappingTest.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class JDBCRegionMappingTest {
+  private JDBCRegionMapping mapping = new JDBCRegionMapping();
+
+  @Test
+  public void initiatedWithNullValues() {
+    assertThat(mapping.getTableName()).isNull();
+    assertThat(mapping.getRegionName()).isNull();
+    assertThat(mapping.getConnectionConfigName()).isNull();
+    assertThat(mapping.getPdxClassName()).isNull();
+  }
+
+  @Test
+  public void hasCorrectTableName() {
+    String name = "name";
+    mapping.setTableName(name);
+    assertThat(mapping.getTableName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectRegionName() {
+    String name = "name";
+    mapping.setRegionName(name);
+    assertThat(mapping.getRegionName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectConfigName() {
+    String name = "name";
+    mapping.setConnectionConfigName(name);
+    assertThat(mapping.getConnectionConfigName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectPdxClassName() {
+    String name = "name";
+    mapping.setPdxClassName(name);
+    assertThat(mapping.getPdxClassName()).isEqualTo(name);
+  }
+
+  @Test
+  public void primaryKeyInValueSetCorrectly() {
+    mapping.setPrimaryKeyInValue("true");
+    assertThat(mapping.isPrimaryKeyInValue()).isTrue();
+  }
+
+  @Test
+  public void returnsFieldNameIfColumnNotMapped() {
+    String fieldName = "myField";
+    mapping.addFieldToColumnMapping("otherField", "column");
+    assertThat(mapping.getColumnNameForField(fieldName)).isEqualTo(fieldName);
+  }
+
+  @Test
+  public void returnsMappedColumnNameForField() {
+    String fieldName = "myField";
+    String columnName = "myColumn";
+    mapping.addFieldToColumnMapping(fieldName, columnName);
+    assertThat(mapping.getColumnNameForField(fieldName)).isEqualTo(columnName);
+
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java
new file mode 100644
index 0000000..55d4b0d
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.Operation;
+
+public class PreparedStatementCacheTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private PreparedStatementCache cache;
+  private Connection connection;
+  private List<ColumnValue> values = new ArrayList<>();
+
+  @Before
+  public void setup() throws SQLException {
+    cache = new PreparedStatementCache();
+    connection = mock(Connection.class);
+    when(connection.prepareStatement(any())).thenReturn(mock(PreparedStatement.class));
+    values.add(mock(ColumnValue.class));
+  }
+
+  @Test
+  public void returnsSameStatementForIdenticalInputs() throws Exception {
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    verify(connection, times(1)).prepareStatement(any());
+  }
+
+  @Test
+  public void returnsDifferentStatementForNonIdenticalInputs() throws Exception {
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    cache.getPreparedStatement(connection, values, "table2", Operation.UPDATE, 1);
+    verify(connection, times(2)).prepareStatement(any());
+  }
+
+  @Test()
+  public void throwsExceptionIfPreparingStatementFails() throws Exception {
+    when(connection.prepareStatement(any())).thenThrow(SQLException.class);
+    thrown.expect(IllegalStateException.class);
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+  }
+
+  @Test
+  public void throwsExceptionIfInvalidOperationGiven() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    cache.getPreparedStatement(connection, values, "table1", Operation.REGION_CLOSE, 1);
+
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SQLHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SQLHandlerTest.java
new file mode 100644
index 0000000..a31c880
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SQLHandlerTest.java
@@ -0,0 +1,361 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
+import org.apache.geode.pdx.internal.PdxType;
+
+public class SQLHandlerTest {
+  private static final String REGION_NAME = "testRegion";
+  private static final String TABLE_NAME = "testTable";
+  private static final Object COLUMN_VALUE_1 = new Object();
+  private static final String COLUMN_NAME_1 = "columnName1";
+  private static final Object COLUMN_VALUE_2 = new Object();
+  private static final String COLUMN_NAME_2 = "columnName2";
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private JDBCManager manager;
+  private Region region;
+  private InternalCache cache;
+  private SQLHandler handler;
+  private PreparedStatement statement;
+  private JDBCRegionMapping regionMapping;
+  private PdxInstanceImpl value;
+
+  @Before
+  public void setup() throws Exception {
+    manager = mock(JDBCManager.class);
+    region = mock(Region.class);
+    cache = mock(InternalCache.class);
+    when(region.getRegionService()).thenReturn(cache);
+    handler = new SQLHandler(manager);
+    value = mock(PdxInstanceImpl.class);
+    when(value.getPdxType()).thenReturn(mock(PdxType.class));
+    setupManagerMock();
+  }
+
+  @Test
+  public void readReturnsNullIfNoKeyProvided() {
+    thrown.expect(IllegalArgumentException.class);
+    handler.read(region, null);
+  }
+
+  @Test
+  public void usesPdxFactoryForClassWhenExists() throws Exception {
+    setupEmptyResultSet();
+    String pdxClassName = "classname";
+    when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
+    handler.read(region, new Object());
+
+    verify(cache).createPdxInstanceFactory(pdxClassName);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void readClearsPreparedStatementWhenFinished() throws Exception {
+    setupEmptyResultSet();
+    handler.read(region, new Object());
+    verify(statement).clearParameters();
+  }
+
+  @Test
+  public void usesPbxFactoryForNoPbxClassWhenClassNonExistent() throws Exception {
+    setupEmptyResultSet();
+    handler.read(region, new Object());
+
+    verify(cache).createPdxInstanceFactory("no class", false);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void readReturnsNullIfNoResultsReturned() throws Exception {
+    setupEmptyResultSet();
+    assertThat(handler.read(region, new Object())).isNull();
+  }
+
+  @Test
+  public void throwsExceptionIfQueryFails() throws Exception {
+    when(statement.executeQuery()).thenThrow(SQLException.class);
+
+    thrown.expect(IllegalStateException.class);
+    handler.read(region, new Object());
+  }
+
+  @Test
+  public void readReturnsDataFromAllResultColumns() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn("key");
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+
+    String filedName1 = COLUMN_NAME_1.toLowerCase();
+    String filedName2 = COLUMN_NAME_2.toLowerCase();
+    handler.read(region, new Object());
+    verify(factory).writeField(filedName1, COLUMN_VALUE_1, Object.class);
+    verify(factory).writeField(filedName2, COLUMN_VALUE_2, Object.class);
+    verify(factory).create();
+  }
+
+  @Test
+  public void readResultOmitsKeyColumnIfNotInValue() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn(COLUMN_NAME_1);
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+
+    String filedName2 = COLUMN_NAME_2.toLowerCase();
+    handler.read(region, new Object());
+    verify(factory).writeField(filedName2, COLUMN_VALUE_2, Object.class);
+    verify(factory, times(1)).writeField(any(), any(), any());
+    verify(factory).create();
+  }
+
+  @Test
+  public void throwsExceptionIfMoreThatOneResultReturned() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true);
+    when(result.getStatement()).thenReturn(mock(PreparedStatement.class));
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn("key");
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean()))
+        .thenReturn(mock(PdxInstanceFactory.class));
+
+    thrown.expect(IllegalStateException.class);
+    handler.read(region, new Object());
+  }
+
+  @Test
+  public void writeThrowsExceptionIfValueIsNullAndNotDoingDestroy() {
+    thrown.expect(IllegalArgumentException.class);
+    handler.write(region, Operation.UPDATE, new Object(), null);
+  }
+
+  @Test
+  public void insertActionSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement).setObject(2, COLUMN_VALUE_2);
+  }
+
+  @Test
+  public void updateActionSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement).setObject(2, COLUMN_VALUE_2);
+  }
+
+  @Test
+  public void destroyActionSucceeds() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement, times(1)).setObject(anyInt(), any());
+  }
+
+  @Test
+  public void destroyActionThatRemovesNoRowCompletesUnexceptionally() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenReturn(0);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement, times(1)).setObject(anyInt(), any());
+  }
+
+  @Test
+  public void destroyThrowExceptionWhenFail() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+  }
+
+  @Test
+  public void preparedStatementClearedAfterExecution() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).clearParameters();
+  }
+
+  @Test
+  public void whenInsertFailsUpdateSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(0);
+
+    PreparedStatement updateStatement = mock(PreparedStatement.class);
+    when(updateStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(updateStatement);
+
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(updateStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(updateStatement).clearParameters();
+  }
+
+  @Test
+  public void whenUpdateFailsInsertSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(0);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(insertStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenInsertFailsWithExceptionUpdateSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement updateStatement = mock(PreparedStatement.class);
+    when(updateStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(updateStatement);
+
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(updateStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(updateStatement).clearParameters();
+  }
+
+  @Test
+  public void whenUpdateFailsWithExceptionInsertSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(insertStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenBothInsertAndUpdateFailExceptionIsThrown() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenThrow(SQLException.class);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenStatementUpdatesMultipleRowsExceptionThrown() throws Exception {
+    when(statement.executeUpdate()).thenReturn(2);
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).clearParameters();
+  }
+
+  private void setupManagerMock() throws SQLException {
+    JDBCConnectionConfiguration connectionConfig = mock(JDBCConnectionConfiguration.class);
+    when(manager.getConnectionConfig(any())).thenReturn(connectionConfig);
+
+    regionMapping = mock(JDBCRegionMapping.class);
+    when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
+    when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
+    when(manager.getMappingForRegion(any())).thenReturn(regionMapping);
+
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    columnList.add(new ColumnValue(true, COLUMN_NAME_2, COLUMN_VALUE_2));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+
+    statement = mock(PreparedStatement.class);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement);
+  }
+
+  private void setupResultSet(ResultSet result) throws SQLException {
+    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+    when(result.getMetaData()).thenReturn(metaData);
+    when(metaData.getColumnCount()).thenReturn(2);
+
+    when(result.getObject(1)).thenReturn(COLUMN_VALUE_1);
+    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
+
+    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
+    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
+  }
+
+  private void setupEmptyResultSet() throws SQLException {
+    ResultSet result = mock(ResultSet.class);
+    when(result.next()).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
new file mode 100644
index 0000000..0f277c0
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class SqlStatementFactoryTest {
+  private static final String TABLE_NAME = "testTable";
+  private static final String KEY_COLUMN_NAME = "column1";
+
+  private List<ColumnValue> columnValues = new ArrayList<>();
+  private SqlStatementFactory factory = new SqlStatementFactory();
+
+  @Before
+  public void setup() {
+    columnValues.add(new ColumnValue(false, "column0", null));
+    columnValues.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    columnValues.add(new ColumnValue(false, "column2", null));
+  }
+
+  @Test
+  public void getSelectQueryString() throws Exception {
+    List<ColumnValue> keyColumn = new ArrayList<>();
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    String statement = factory.createSelectQueryString(TABLE_NAME, keyColumn);
+    String expectedStatement =
+        String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getDestroySqlString() throws Exception {
+    List<ColumnValue> keyColumn = new ArrayList<>();
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    String statement = factory.createDestroySqlString(TABLE_NAME, keyColumn);
+    String expectedStatement =
+        String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getUpdateSqlString() throws Exception {
+    String statement = factory.createUpdateSqlString(TABLE_NAME, columnValues);
+    String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
+        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(2).getColumnName(),
+        KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getInsertSqlString() throws Exception {
+    String statement = factory.createInsertSqlString(TABLE_NAME, columnValues);
+    String expectedStatement = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?,?,?)",
+        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(1).getColumnName(),
+        columnValues.get(2).getColumnName());
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
new file mode 100644
index 0000000..34efb6a
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+public class TestConfigService {
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String REGION_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+  private static final String CONNECTION_CONFIG_NAME = "testConnectionConfig";
+
+  public static JDBCConfigurationService getTestConfigService() {
+    JDBCConfigurationService service = new JDBCConfigurationService();
+    service.addOrUpdateConnectionConfig(createConnectionConfig());
+    service.addOrUpdateRegionMapping(createRegionMapping());
+    return service;
+  }
+
+  private static JDBCRegionMapping createRegionMapping() {
+    JDBCRegionMapping mapping = new JDBCRegionMapping();
+    mapping.setConnectionConfigName(CONNECTION_CONFIG_NAME);
+    mapping.setTableName(REGION_TABLE_NAME);
+    mapping.setRegionName(REGION_NAME);
+    mapping.setPrimaryKeyInValue("false");
+    return mapping;
+  }
+
+  private static JDBCConnectionConfiguration createConnectionConfig() {
+    JDBCConnectionConfiguration config = new JDBCConnectionConfiguration();
+    config.setUrl(CONNECTION_URL);
+    config.setName(CONNECTION_CONFIG_NAME);
+    return config;
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.