You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/11/28 23:57:59 UTC

[geode] 01/06: GEODE-3781: add geode-connectors module

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4953c8f6f13b107a742fa58623b79503aa13e850
Author: Anil <ag...@pivotal.io>
AuthorDate: Mon Oct 23 12:01:58 2017 -0700

    GEODE-3781: add geode-connectors module
    
    Introduce new PDX JDBC Connector Service
    * GEODE-3782: implement JdbcWriter
    * GEODE-3783: implement JdbcAsyncWriter
    * GEODE-3823: implement JdbcLoader
---
 geode-assembly/build.gradle                        |   6 +-
 geode-connectors/build.gradle                      |  32 ++
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     | 107 ++++++
 .../apache/geode/connectors/jdbc/JdbcLoader.java   |  54 +++
 .../apache/geode/connectors/jdbc/JdbcWriter.java   | 102 ++++++
 .../jdbc/internal/AbstractJdbcCallback.java        |  63 ++++
 .../connectors/jdbc/internal/ColumnValue.java      |  44 +++
 .../jdbc/internal/ConnectionConfiguration.java     |  84 +++++
 .../jdbc/internal/ConnectionManager.java           | 193 +++++++++++
 .../internal/InternalJdbcConnectorService.java     |  30 ++
 .../jdbc/internal/JdbcConnectorService.java        |  92 ++++++
 .../jdbc/internal/PreparedStatementCache.java      | 104 ++++++
 .../connectors/jdbc/internal/RegionMapping.java    | 118 +++++++
 .../geode/connectors/jdbc/internal/SqlHandler.java | 187 +++++++++++
 .../jdbc/internal/SqlStatementFactory.java         |  78 +++++
 .../jdbc/internal/xml/ConnectionConfigBuilder.java |  48 +++
 .../connectors/jdbc/internal/xml/ElementType.java  | 131 ++++++++
 .../xml/JdbcConnectorServiceXmlGenerator.java      | 130 ++++++++
 .../xml/JdbcConnectorServiceXmlParser.java         |  57 ++++
 .../internal/xml/JdbcServiceConfiguration.java     |  60 ++++
 .../jdbc/internal/xml/RegionMappingBuilder.java    |  65 ++++
 .../org.apache.geode.internal.cache.CacheService   |   1 +
 ....apache.geode.internal.cache.xmlcache.XmlParser |   1 +
 .../org/apache/geode/connectors/jdbc/Employee.java |  54 +++
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       | 254 ++++++++++++++
 .../geode/connectors/jdbc/JdbcAsyncWriterTest.java |  93 ++++++
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java | 105 ++++++
 .../geode/connectors/jdbc/JdbcLoaderTest.java      |  44 +++
 .../connectors/jdbc/JdbcWriterIntegrationTest.java | 234 +++++++++++++
 .../geode/connectors/jdbc/JdbcWriterTest.java      | 119 +++++++
 .../jdbc/internal/AbstractJdbcCallbackTest.java    |  73 +++++
 .../connectors/jdbc/internal/ColumnValueTest.java  |  57 ++++
 .../jdbc/internal/ConnectionConfigurationTest.java |  63 ++++
 .../jdbc/internal/ConnectionManagerUnitTest.java   | 252 ++++++++++++++
 .../jdbc/internal/JdbcConnectorServiceTest.java    |  88 +++++
 .../jdbc/internal/PreparedStatementCacheTest.java  |  82 +++++
 .../jdbc/internal/RegionMappingTest.java           | 108 ++++++
 .../connectors/jdbc/internal/SqlHandlerTest.java   | 365 +++++++++++++++++++++
 .../jdbc/internal/SqlStatementFactoryTest.java     |  81 +++++
 .../jdbc/internal/TestConfigService.java           |  54 +++
 .../jdbc/internal/TestableConnectionManager.java   |  22 ++
 .../internal/xml/ConnectionConfigBuilderTest.java  |  46 +++
 .../jdbc/internal/xml/ElementTypeTest.java         | 216 ++++++++++++
 ...onnectorServiceXmlGeneratorIntegrationTest.java | 195 +++++++++++
 .../xml/JdbcConnectorServiceXmlGeneratorTest.java  |  33 ++
 .../JdbcConnectorServiceXmlIntegrationTest.java    | 116 +++++++
 .../xml/JdbcConnectorServiceXmlParserTest.java     | 106 ++++++
 .../internal/xml/JdbcServiceConfigurationTest.java | 136 ++++++++
 .../internal/xml/RegionMappingBuilderTest.java     |  54 +++
 .../java/org/apache/geode/test/fake/Fakes.java     |   5 +-
 settings.gradle                                    |   1 +
 51 files changed, 4841 insertions(+), 2 deletions(-)

diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle
index 1f139a5..c9daac4 100755
--- a/geode-assembly/build.gradle
+++ b/geode-assembly/build.gradle
@@ -59,6 +59,7 @@ dependencies {
   archives project(':geode-client-protocol')
   archives project(':geode-json')
   archives project(':geode-core')
+  archives project(':geode-connectors')
   archives project(':geode-lucene')
   archives project(':geode-old-client-support')
   archives project(':geode-protobuf')
@@ -364,7 +365,10 @@ distributions {
 
         from project(":geode-lucene").configurations.runtime
         from project(":geode-lucene").configurations.archives.allArtifacts.files
-        
+       
+        from project(":geode-connectors").configurations.runtime
+        from project(":geode-connectors").configurations.archives.allArtifacts.files
+ 
         from project(":geode-old-client-support").configurations.runtime
         from project(":geode-old-client-support").configurations.archives.allArtifacts.files
 
diff --git a/geode-connectors/build.gradle b/geode-connectors/build.gradle
new file mode 100644
index 0000000..b11f352
--- /dev/null
+++ b/geode-connectors/build.gradle
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+    compile project(':geode-core')
+    compile project(':geode-common')
+
+    testCompile project(':geode-junit')
+
+    //Connectors test framework.
+    testRuntime 'org.apache.derby:derby:' + project.'derby.version'
+    testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version'
+    testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version'
+    testCompile files(project(':geode-core').sourceSets.test.output)
+    testCompile project(':geode-old-versions')
+}
+
+integrationTest.forkEvery 0
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
new file mode 100644
index 0000000..465e1c6
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CopyHelper;
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.connectors.jdbc.internal.AbstractJdbcCallback;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.pdx.PdxInstance;
+
+/**
+ * This class provides write behind cache semantics for a JDBC data source using AsyncEventListener.
+ *
+ * @since Geode 1.4
+ */
+@Experimental
+public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventListener {
+  private static final Logger logger = LogService.getLogger();
+
+  private AtomicLong totalEvents = new AtomicLong();
+  private AtomicLong successfulEvents = new AtomicLong();
+
+  @SuppressWarnings("unused")
+  public JdbcAsyncWriter() {
+    super();
+  }
+
+  // Constructor for test purposes only
+  JdbcAsyncWriter(SqlHandler sqlHandler) {
+    super(sqlHandler);
+  }
+
+  @Override
+  public boolean processEvents(List<AsyncEvent> events) {
+    changeTotalEvents(events.size());
+
+    if (!events.isEmpty()) {
+      checkInitialized((InternalCache) events.get(0).getRegion().getRegionService());
+    }
+
+    DefaultQuery.setPdxReadSerialized(true);
+    try {
+      for (AsyncEvent event : events) {
+        try {
+          getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
+              getPdxInstance(event));
+          changeSuccessfulEvents(1);
+        } catch (RuntimeException ex) {
+          logger.error("Exception processing event {}", event, ex);
+        }
+      }
+    } finally {
+      DefaultQuery.setPdxReadSerialized(false);
+    }
+
+    return true;
+  }
+
+  long getTotalEvents() {
+    return totalEvents.get();
+  }
+
+  long getSuccessfulEvents() {
+    return successfulEvents.get();
+  }
+
+  private void changeSuccessfulEvents(long delta) {
+    successfulEvents.addAndGet(delta);
+  }
+
+  private void changeTotalEvents(long delta) {
+    totalEvents.addAndGet(delta);
+  }
+
+  /**
+   * precondition: DefaultQuery.setPdxReadSerialized(true)
+   */
+  private PdxInstance getPdxInstance(AsyncEvent event) {
+    Object value = event.getDeserializedValue();
+    if (!(value instanceof PdxInstance)) {
+      value = CopyHelper.copy(value);
+    }
+    return (PdxInstance) value;
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
new file mode 100644
index 0000000..f3ba9a5
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.connectors.jdbc.internal.AbstractJdbcCallback;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
+
+/**
+ * This class provides loading from a data source using JDBC.
+ *
+ * @since Geode 1.4
+ */
+@Experimental
+public class JdbcLoader<K, V> extends AbstractJdbcCallback implements CacheLoader<K, V> {
+
+  @SuppressWarnings("unused")
+  public JdbcLoader() {
+    super();
+  }
+
+  // Constructor for test purposes only
+  JdbcLoader(SqlHandler sqlHandler) {
+    super(sqlHandler);
+  }
+
+  /**
+   * @return this method always returns a PdxInstance. It does not matter what the V generic
+   *         parameter is set to.
+   */
+  @Override
+  public V load(LoaderHelper<K, V> helper) throws CacheLoaderException {
+    // The following cast to V is to keep the compiler happy
+    // but is erased at runtime and no actual cast happens.
+    checkInitialized((InternalCache) helper.getRegion().getRegionService());
+    return (V) getSqlHandler().read(helper.getRegion(), helper.getKey());
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
new file mode 100644
index 0000000..c901d3d
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcWriter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import org.apache.geode.CopyHelper;
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.SerializedCacheValue;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.connectors.jdbc.internal.AbstractJdbcCallback;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstance;
+
+/**
+ * This class provides synchronous write through to a data source using JDBC.
+ *
+ * @since Geode 1.4
+ */
+@Experimental
+public class JdbcWriter<K, V> extends AbstractJdbcCallback implements CacheWriter<K, V> {
+
+  @SuppressWarnings("unused")
+  public JdbcWriter() {
+    super();
+  }
+
+  // Constructor for test purposes only
+  JdbcWriter(SqlHandler sqlHandler) {
+    super(sqlHandler);
+  }
+
+
+  @Override
+  public void beforeUpdate(EntryEvent<K, V> event) throws CacheWriterException {
+    checkInitialized((InternalCache) event.getRegion().getRegionService());
+    getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
+        getPdxNewValue(event));
+  }
+
+  @Override
+  public void beforeCreate(EntryEvent<K, V> event) throws CacheWriterException {
+    checkInitialized((InternalCache) event.getRegion().getRegionService());
+    getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
+        getPdxNewValue(event));
+  }
+
+  @Override
+  public void beforeDestroy(EntryEvent<K, V> event) throws CacheWriterException {
+    checkInitialized((InternalCache) event.getRegion().getRegionService());
+    getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
+        getPdxNewValue(event));
+  }
+
+  @Override
+  public void beforeRegionDestroy(RegionEvent<K, V> event) throws CacheWriterException {
+    // this event is not sent to JDBC
+  }
+
+  @Override
+  public void beforeRegionClear(RegionEvent<K, V> event) throws CacheWriterException {
+    // this event is not sent to JDBC
+  }
+
+  private PdxInstance getPdxNewValue(EntryEvent<K, V> event) {
+    DefaultQuery.setPdxReadSerialized(true);
+    try {
+      Object newValue = event.getNewValue();
+      if (!(newValue instanceof PdxInstance)) {
+        SerializedCacheValue<V> serializedNewValue = event.getSerializedNewValue();
+        if (serializedNewValue != null) {
+          newValue = serializedNewValue.getDeserializedValue();
+        } else {
+          newValue = CopyHelper.copy(newValue);
+        }
+        if (newValue != null && !(newValue instanceof PdxInstance)) {
+          String valueClassName = newValue == null ? "null" : newValue.getClass().getName();
+          throw new IllegalArgumentException(getClass().getSimpleName()
+              + " only supports PDX values; newValue is " + valueClassName);
+        }
+      }
+      return (PdxInstance) newValue;
+    } finally {
+      DefaultQuery.setPdxReadSerialized(false);
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
new file mode 100644
index 0000000..478edd0
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallback.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.Properties;
+
+import org.apache.geode.cache.CacheCallback;
+import org.apache.geode.internal.cache.InternalCache;
+
+public abstract class AbstractJdbcCallback implements CacheCallback {
+
+  private volatile SqlHandler sqlHandler;
+
+  protected AbstractJdbcCallback() {
+    // nothing
+  }
+
+  protected AbstractJdbcCallback(SqlHandler sqlHandler) {
+    this.sqlHandler = sqlHandler;
+  }
+
+  @Override
+  public void close() {
+    if (sqlHandler != null) {
+      sqlHandler.close();
+    }
+  }
+
+  @Override
+  public void init(Properties props) {
+    // nothing
+  }
+
+  protected SqlHandler getSqlHandler() {
+    return sqlHandler;
+  }
+
+  protected void checkInitialized(InternalCache cache) {
+    if (sqlHandler == null) {
+      initialize(cache);
+    }
+  }
+
+  private synchronized void initialize(InternalCache cache) {
+    if (sqlHandler == null) {
+      InternalJdbcConnectorService service = cache.getService(InternalJdbcConnectorService.class);
+      ConnectionManager manager = new ConnectionManager(service);
+      sqlHandler = new SqlHandler(manager);
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
new file mode 100644
index 0000000..c3f44d0
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ColumnValue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+class ColumnValue {
+  private final boolean isKey;
+  private final String columnName;
+  private final Object value;
+
+  ColumnValue(boolean isKey, String columnName, Object value) {
+    this.isKey = isKey;
+    this.columnName = columnName;
+    this.value = value;
+  }
+
+  boolean isKey() {
+    return isKey;
+  }
+
+  String getColumnName() {
+    return columnName;
+  }
+
+  Object getValue() {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnValue [isKey=" + isKey + ", columnName=" + columnName + ", value=" + value + "]";
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionConfiguration.java
new file mode 100644
index 0000000..fa3807d
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionConfiguration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+public class ConnectionConfiguration {
+
+  private final String name;
+  private final String url;
+  private final String user;
+  private final String password;
+
+  public ConnectionConfiguration(String name, String url, String user, String password) {
+    this.name = name;
+    this.url = url;
+    this.user = user;
+    this.password = password;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ConnectionConfiguration that = (ConnectionConfiguration) o;
+
+    if (name != null ? !name.equals(that.name) : that.name != null) {
+      return false;
+    }
+    if (url != null ? !url.equals(that.url) : that.url != null) {
+      return false;
+    }
+    if (user != null ? !user.equals(that.user) : that.user != null) {
+      return false;
+    }
+    return password != null ? password.equals(that.password) : that.password == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = name != null ? name.hashCode() : 0;
+    result = 31 * result + (url != null ? url.hashCode() : 0);
+    result = 31 * result + (user != null ? user.hashCode() : 0);
+    result = 31 * result + (password != null ? password.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "ConnectionConfiguration{" + "name='" + name + '\'' + ", url='" + url + '\'' + ", user='"
+        + user + '\'' + ", password='" + password + '\'' + '}';
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
new file mode 100644
index 0000000..d382112
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.pdx.PdxInstance;
+
+class ConnectionManager {
+
+  private final InternalJdbcConnectorService configService;
+  private final Map<String, Connection> connectionMap = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, String> tableToPrimaryKeyMap = new ConcurrentHashMap<>();
+  private final ThreadLocal<PreparedStatementCache> preparedStatementCache = new ThreadLocal<>();
+
+  ConnectionManager(InternalJdbcConnectorService configService) {
+    this.configService = configService;
+  }
+
+  RegionMapping getMappingForRegion(String regionName) {
+    return configService.getMappingForRegion(regionName);
+  }
+
+  Connection getConnection(ConnectionConfiguration config) {
+    Connection connection = connectionMap.get(config.getName());
+    try {
+      if (connection != null && !connection.isClosed()) {
+        return connection;
+      }
+    } catch (SQLException ignore) {
+      // If isClosed throws fall through and connect again
+    }
+    return getNewConnection(config);
+  }
+
+  <K> List<ColumnValue> getColumnToValueList(ConnectionConfiguration config,
+      RegionMapping regionMapping, K key, PdxInstance value, Operation operation) {
+    String keyColumnName = getKeyColumnName(config, regionMapping.getTableName());
+    ColumnValue keyColumnValue = new ColumnValue(true, keyColumnName, key);
+
+    if (operation.isDestroy() || operation.isGet()) {
+      return Collections.singletonList(keyColumnValue);
+    }
+
+    List<ColumnValue> result = createColumnValueList(regionMapping, value, keyColumnName);
+    result.add(keyColumnValue);
+    return result;
+  }
+
+  void close() {
+    connectionMap.values().forEach(this::close);
+  }
+
+  String getKeyColumnName(ConnectionConfiguration connectionConfig, String tableName) {
+    return tableToPrimaryKeyMap.computeIfAbsent(tableName,
+        k -> computeKeyColumnName(connectionConfig, k));
+  }
+
+  ConnectionConfiguration getConnectionConfig(String connectionConfigName) {
+    return configService.getConnectionConfig(connectionConfigName);
+  }
+
+  PreparedStatement getPreparedStatement(Connection connection, List<ColumnValue> columnList,
+      String tableName, Operation operation, int pdxTypeId) {
+    PreparedStatementCache statementCache = preparedStatementCache.get();
+
+    if (statementCache == null) {
+      statementCache = new PreparedStatementCache();
+      preparedStatementCache.set(statementCache);
+    }
+
+    return statementCache.getPreparedStatement(connection, columnList, tableName, operation,
+        pdxTypeId);
+  }
+
+  // package protected for testing purposes only
+  Connection getSQLConnection(ConnectionConfiguration config) throws SQLException {
+    return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
+  }
+
+  private synchronized Connection getNewConnection(ConnectionConfiguration config) {
+    Connection connection;
+    try {
+      connection = getSQLConnection(config);
+    } catch (SQLException e) {
+      // TODO: consider a different exception
+      throw new IllegalStateException("Could not connect to " + config.getUrl(), e);
+    }
+    connectionMap.put(config.getName(), connection);
+    return connection;
+  }
+
+  private List<ColumnValue> createColumnValueList(RegionMapping regionMapping, PdxInstance value,
+      String keyColumnName) {
+    List<ColumnValue> result = new ArrayList<>();
+    for (String fieldName : value.getFieldNames()) {
+      String columnName = regionMapping.getColumnNameForField(fieldName);
+      if (columnName.equalsIgnoreCase(keyColumnName)) {
+        continue;
+      }
+      ColumnValue columnValue = new ColumnValue(false, columnName, value.getField(fieldName));
+      result.add(columnValue);
+    }
+    return result;
+  }
+
+  private String computeKeyColumnName(ConnectionConfiguration connectionConfig, String tableName) {
+    // TODO: check config for key column
+    String key = null;
+    try {
+      Connection connection = getConnection(connectionConfig);
+      DatabaseMetaData metaData = connection.getMetaData();
+      ResultSet tables = metaData.getTables(null, null, "%", null);
+
+      String realTableName = getTableNameFromMetaData(tableName, tables);
+      key = getPrimaryKeyColumnNameFromMetaData(realTableName, metaData);
+
+    } catch (SQLException e) {
+      handleSQLException(e);
+    }
+    return key;
+  }
+
+  private String getTableNameFromMetaData(String tableName, ResultSet tables) throws SQLException {
+    String realTableName = null;
+    while (tables.next()) {
+      String name = tables.getString("TABLE_NAME");
+      if (name.equalsIgnoreCase(tableName)) {
+        if (realTableName != null) {
+          throw new IllegalStateException("Duplicate tables that match region name");
+        }
+        realTableName = name;
+      }
+    }
+
+    if (realTableName == null) {
+      throw new IllegalStateException("no table was found that matches " + tableName);
+    }
+    return realTableName;
+  }
+
+  private String getPrimaryKeyColumnNameFromMetaData(String tableName, DatabaseMetaData metaData)
+      throws SQLException {
+    ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, tableName);
+    if (!primaryKeys.next()) {
+      throw new IllegalStateException(
+          "The table " + tableName + " does not have a primary key column.");
+    }
+    String key = primaryKeys.getString("COLUMN_NAME");
+    if (primaryKeys.next()) {
+      throw new IllegalStateException(
+          "The table " + tableName + " has more than one primary key column.");
+    }
+    return key;
+  }
+
+  private void handleSQLException(SQLException e) {
+    throw new IllegalStateException("NYI: handleSQLException", e);
+  }
+
+  private void close(Connection connection) {
+    if (connection != null) {
+      try {
+        connection.close();
+      } catch (SQLException ignore) {
+      }
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
new file mode 100644
index 0000000..d0bd167
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.cache.extension.Extension;
+
+public interface InternalJdbcConnectorService extends Extension<Cache>, CacheService {
+  void addOrUpdateConnectionConfig(ConnectionConfiguration config);
+
+  void addOrUpdateRegionMapping(RegionMapping mapping);
+
+  ConnectionConfiguration getConnectionConfig(String connectionName);
+
+  RegionMapping getMappingForRegion(String regionName);
+
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorService.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorService.java
new file mode 100644
index 0000000..9af7aeb
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlGenerator;
+import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.extension.Extensible;
+import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
+import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
+
+public class JdbcConnectorService implements InternalJdbcConnectorService {
+
+  private final Map<String, ConnectionConfiguration> connectionsByName = new ConcurrentHashMap<>();
+  private final Map<String, RegionMapping> mappingsByRegion = new ConcurrentHashMap<>();
+  private volatile InternalCache cache;
+  private boolean registered;
+
+  public ConnectionConfiguration getConnectionConfig(String connectionName) {
+    return connectionsByName.get(connectionName);
+  }
+
+  public RegionMapping getMappingForRegion(String regionName) {
+    return mappingsByRegion.get(regionName);
+  }
+
+  @Override
+  public void addOrUpdateConnectionConfig(ConnectionConfiguration config) {
+    registerAsExtension();
+    connectionsByName.put(config.getName(), config);
+  }
+
+  @Override
+  public void addOrUpdateRegionMapping(RegionMapping mapping) {
+    registerAsExtension();
+    mappingsByRegion.put(mapping.getRegionName(), mapping);
+  }
+
+  @Override
+  public void init(Cache cache) {
+    this.cache = (InternalCache) cache;
+  }
+
+  private synchronized void registerAsExtension() {
+    if (!registered) {
+      cache.getExtensionPoint().addExtension(this);
+      registered = true;
+    }
+  }
+
+  @Override
+  public Class<? extends CacheService> getInterface() {
+    return InternalJdbcConnectorService.class;
+  }
+
+  @Override
+  public CacheServiceMBeanBase getMBean() {
+    return null;
+  }
+
+  @Override
+  public XmlGenerator<Cache> getXmlGenerator() {
+    return new JdbcConnectorServiceXmlGenerator(connectionsByName.values(),
+        mappingsByRegion.values());
+  }
+
+  @Override
+  public void beforeCreate(Extensible<Cache> source, Cache cache) {
+    // nothing
+  }
+
+  @Override
+  public void onCreate(Extensible<Cache> source, Extensible<Cache> target) {
+    // nothing
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java
new file mode 100644
index 0000000..bcc6700
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCache.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.geode.cache.Operation;
+
+class PreparedStatementCache {
+
+  private SqlStatementFactory statementFactory = new SqlStatementFactory();
+  // TODO: if connection lost, we will still keep the statement. Make LRU?
+  private Map<StatementKey, PreparedStatement> statements = new HashMap<>();
+
+  PreparedStatement getPreparedStatement(Connection connection, List<ColumnValue> columnList,
+      String tableName, Operation operation, int pdxTypeId) {
+    StatementKey key = new StatementKey(pdxTypeId, operation, tableName);
+    return statements.computeIfAbsent(key, k -> {
+      String sqlStr = getSqlString(tableName, columnList, operation);
+      PreparedStatement statement = null;
+      try {
+        statement = connection.prepareStatement(sqlStr);
+      } catch (SQLException e) {
+        handleSQLException(e);
+      }
+      return statement;
+    });
+  }
+
+  private String getSqlString(String tableName, List<ColumnValue> columnList, Operation operation) {
+    if (operation.isCreate()) {
+      return statementFactory.createInsertSqlString(tableName, columnList);
+    } else if (operation.isUpdate()) {
+      return statementFactory.createUpdateSqlString(tableName, columnList);
+    } else if (operation.isDestroy()) {
+      return statementFactory.createDestroySqlString(tableName, columnList);
+    } else if (operation.isGet()) {
+      return statementFactory.createSelectQueryString(tableName, columnList);
+    } else {
+      throw new IllegalArgumentException("unsupported operation " + operation);
+    }
+  }
+
+  private void handleSQLException(SQLException e) {
+    throw new IllegalStateException("NYI: handleSQLException", e);
+  }
+
+  private static class StatementKey {
+    private final int pdxTypeId;
+    private final Operation operation;
+    private final String tableName;
+
+    StatementKey(int pdxTypeId, Operation operation, String tableName) {
+      this.pdxTypeId = pdxTypeId;
+      this.operation = operation;
+      this.tableName = tableName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      StatementKey that = (StatementKey) o;
+
+      if (pdxTypeId != that.pdxTypeId) {
+        return false;
+      }
+      if (operation != null ? !operation.equals(that.operation) : that.operation != null) {
+        return false;
+      }
+      return tableName != null ? tableName.equals(that.tableName) : that.tableName == null;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = pdxTypeId;
+      result = 31 * result + (operation != null ? operation.hashCode() : 0);
+      result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+      return result;
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
new file mode 100644
index 0000000..327775c
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class RegionMapping {
+  private final String regionName;
+  private final String pdxClassName;
+  private final String tableName;
+  private final String connectionConfigName;
+  private final boolean primaryKeyInValue;
+  private final Map<String, String> fieldToColumnMap;
+
+  public RegionMapping(String regionName, String pdxClassName, String tableName,
+      String connectionConfigName, boolean primaryKeyInValue,
+      Map<String, String> fieldToColumnMap) {
+    this.regionName = regionName;
+    this.pdxClassName = pdxClassName;
+    this.tableName = tableName;
+    this.connectionConfigName = connectionConfigName;
+    this.primaryKeyInValue = primaryKeyInValue;
+    this.fieldToColumnMap = fieldToColumnMap;
+  }
+
+  public String getConnectionConfigName() {
+    return connectionConfigName;
+  }
+
+  public String getRegionName() {
+    return regionName;
+  }
+
+  public String getPdxClassName() {
+    return pdxClassName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public boolean isPrimaryKeyInValue() {
+    return primaryKeyInValue;
+  }
+
+  public String getColumnNameForField(String fieldName) {
+    String columnName = fieldToColumnMap.get(fieldName);
+    return columnName != null ? columnName : fieldName;
+  }
+
+  public Map<String, String> getFieldToColumnMap() {
+    return Collections.unmodifiableMap(fieldToColumnMap);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    RegionMapping that = (RegionMapping) o;
+
+    if (primaryKeyInValue != that.primaryKeyInValue) {
+      return false;
+    }
+    if (regionName != null ? !regionName.equals(that.regionName) : that.regionName != null) {
+      return false;
+    }
+    if (pdxClassName != null ? !pdxClassName.equals(that.pdxClassName)
+        : that.pdxClassName != null) {
+      return false;
+    }
+    if (tableName != null ? !tableName.equals(that.tableName) : that.tableName != null) {
+      return false;
+    }
+    if (connectionConfigName != null ? !connectionConfigName.equals(that.connectionConfigName)
+        : that.connectionConfigName != null) {
+      return false;
+    }
+    return fieldToColumnMap != null ? fieldToColumnMap.equals(that.fieldToColumnMap)
+        : that.fieldToColumnMap == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = regionName != null ? regionName.hashCode() : 0;
+    result = 31 * result + (pdxClassName != null ? pdxClassName.hashCode() : 0);
+    result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+    result = 31 * result + (connectionConfigName != null ? connectionConfigName.hashCode() : 0);
+    result = 31 * result + (primaryKeyInValue ? 1 : 0);
+    result = 31 * result + (fieldToColumnMap != null ? fieldToColumnMap.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "RegionMapping{" + "regionName='" + regionName + '\'' + ", pdxClassName='" + pdxClassName
+        + '\'' + ", tableName='" + tableName + '\'' + ", connectionConfigName='"
+        + connectionConfigName + '\'' + ", primaryKeyInValue=" + primaryKeyInValue
+        + ", fieldToColumnMap=" + fieldToColumnMap + '}';
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
new file mode 100644
index 0000000..b73698c
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
+
+public class SqlHandler {
+  private ConnectionManager manager;
+
+  public SqlHandler(ConnectionManager manager) {
+    this.manager = manager;
+  }
+
+  public void close() {
+    manager.close();
+  }
+
+  public <K, V> PdxInstance read(Region<K, V> region, K key) {
+    if (key == null) {
+      throw new IllegalArgumentException("Key for query cannot be null");
+    }
+
+    RegionMapping regionMapping = manager.getMappingForRegion(region.getName());
+    ConnectionConfiguration connectionConfig =
+        manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+
+    List<ColumnValue> columnList =
+        manager.getColumnToValueList(connectionConfig, regionMapping, key, null, Operation.GET);
+    String tableName = regionMapping.getTableName();
+    PreparedStatement statement = manager.getPreparedStatement(
+        manager.getConnection(connectionConfig), columnList, tableName, Operation.GET, 0);
+    PdxInstanceFactory factory = getPdxInstanceFactory(region, regionMapping);
+    String keyColumnName = manager.getKeyColumnName(connectionConfig, tableName);
+    return executeReadStatement(statement, columnList, factory, regionMapping, keyColumnName);
+  }
+
+  private <K, V> PdxInstanceFactory getPdxInstanceFactory(Region<K, V> region,
+      RegionMapping regionMapping) {
+    InternalCache cache = (InternalCache) region.getRegionService();
+    String valueClassName = regionMapping.getPdxClassName();
+    PdxInstanceFactory factory;
+    if (valueClassName != null) {
+      factory = cache.createPdxInstanceFactory(valueClassName);
+    } else {
+      factory = cache.createPdxInstanceFactory("no class", false);
+    }
+    return factory;
+  }
+
+  private PdxInstance executeReadStatement(PreparedStatement statement,
+      List<ColumnValue> columnList, PdxInstanceFactory factory, RegionMapping regionMapping,
+      String keyColumnName) {
+    PdxInstance pdxInstance = null;
+    synchronized (statement) {
+      try {
+        setValuesInStatement(statement, columnList);
+        ResultSet resultSet = statement.executeQuery();
+        if (resultSet.next()) {
+
+          ResultSetMetaData metaData = resultSet.getMetaData();
+          int ColumnsNumber = metaData.getColumnCount();
+          for (int i = 1; i <= ColumnsNumber; i++) {
+            Object columnValue = resultSet.getObject(i);
+            String columnName = metaData.getColumnName(i);
+            String fieldName = mapColumnNameToFieldName(columnName);
+            if (regionMapping.isPrimaryKeyInValue()
+                || !keyColumnName.equalsIgnoreCase(columnName)) {
+              factory.writeField(fieldName, columnValue, Object.class);
+            }
+          }
+          if (resultSet.next()) {
+            throw new IllegalStateException(
+                "Multiple rows returned for query: " + resultSet.getStatement().toString());
+          }
+          pdxInstance = factory.create();
+        }
+      } catch (SQLException e) {
+        handleSQLException(e);
+      } finally {
+        clearStatementParameters(statement);
+      }
+    }
+    return pdxInstance;
+  }
+
+  private void setValuesInStatement(PreparedStatement statement, List<ColumnValue> columnList)
+      throws SQLException {
+    int index = 0;
+    for (ColumnValue columnValue : columnList) {
+      index++;
+      statement.setObject(index, columnValue.getValue());
+    }
+  }
+
+  private String mapColumnNameToFieldName(String columnName) {
+    return columnName.toLowerCase();
+  }
+
+  public <K, V> void write(Region<K, V> region, Operation operation, K key, PdxInstance value) {
+    if (value == null && operation != Operation.DESTROY) {
+      throw new IllegalArgumentException("PdxInstance cannot be null for non-destroy operations");
+    }
+    RegionMapping regionMapping = manager.getMappingForRegion(region.getName());
+    final String tableName = regionMapping.getTableName();
+    ConnectionConfiguration connectionConfig =
+        manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+    List<ColumnValue> columnList =
+        manager.getColumnToValueList(connectionConfig, regionMapping, key, value, operation);
+
+    int pdxTypeId = value == null ? 0 : ((PdxInstanceImpl) value).getPdxType().getTypeId();
+    PreparedStatement statement = manager.getPreparedStatement(
+        manager.getConnection(connectionConfig), columnList, tableName, operation, pdxTypeId);
+    int updateCount = executeWriteStatement(statement, columnList, operation, false);
+
+    // Destroy action not guaranteed to modify any database rows
+    if (operation.isDestroy()) {
+      return;
+    }
+
+    if (updateCount <= 0) {
+      Operation upsertOp = getOppositeOperation(operation);
+      PreparedStatement upsertStatement = manager.getPreparedStatement(
+          manager.getConnection(connectionConfig), columnList, tableName, upsertOp, pdxTypeId);
+      updateCount = executeWriteStatement(upsertStatement, columnList, upsertOp, true);
+    }
+
+    if (updateCount != 1) {
+      throw new IllegalStateException("Unexpected updateCount " + updateCount);
+    }
+  }
+
+  private Operation getOppositeOperation(Operation operation) {
+    return operation.isUpdate() ? Operation.CREATE : Operation.UPDATE;
+  }
+
+  private int executeWriteStatement(PreparedStatement statement, List<ColumnValue> columnList,
+      Operation operation, boolean handleException) {
+    int updateCount = 0;
+    synchronized (statement) {
+      try {
+        setValuesInStatement(statement, columnList);
+        updateCount = statement.executeUpdate();
+      } catch (SQLException e) {
+        if (handleException || operation.isDestroy()) {
+          handleSQLException(e);
+        }
+      } finally {
+        clearStatementParameters(statement);
+      }
+    }
+    return updateCount;
+  }
+
+  private void clearStatementParameters(PreparedStatement statement) {
+    try {
+      statement.clearParameters();
+    } catch (SQLException ignore) {
+    }
+  }
+
+  private void handleSQLException(SQLException e) {
+    throw new IllegalStateException("NYI: handleSQLException", e);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
new file mode 100644
index 0000000..d5367ef
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import java.util.List;
+
+class SqlStatementFactory {
+
+  String createSelectQueryString(String tableName, List<ColumnValue> columnList) {
+    assert columnList.size() == 1;
+    ColumnValue keyCV = columnList.get(0);
+    assert keyCV.isKey();
+    return "SELECT * FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+  }
+
+  String createDestroySqlString(String tableName, List<ColumnValue> columnList) {
+    assert columnList.size() == 1;
+    ColumnValue keyCV = columnList.get(0);
+    assert keyCV.isKey();
+    return "DELETE FROM " + tableName + " WHERE " + keyCV.getColumnName() + " = ?";
+  }
+
+  String createUpdateSqlString(String tableName, List<ColumnValue> columnList) {
+    StringBuilder query = new StringBuilder("UPDATE " + tableName + " SET ");
+    int idx = 0;
+    for (ColumnValue column : columnList) {
+      if (!column.isKey()) {
+        idx++;
+        if (idx > 1) {
+          query.append(", ");
+        }
+        query.append(column.getColumnName());
+        query.append(" = ?");
+      }
+    }
+    for (ColumnValue column : columnList) {
+      if (column.isKey()) {
+        query.append(" WHERE ");
+        query.append(column.getColumnName());
+        query.append(" = ?");
+        // currently only support simple primary key with one column
+        break;
+      }
+    }
+    return query.toString();
+  }
+
+  String createInsertSqlString(String tableName, List<ColumnValue> columnList) {
+    StringBuilder columnNames = new StringBuilder("INSERT INTO " + tableName + " (");
+    StringBuilder columnValues = new StringBuilder(" VALUES (");
+    int columnCount = columnList.size();
+    int idx = 0;
+    for (ColumnValue column : columnList) {
+      idx++;
+      columnNames.append(column.getColumnName());
+      columnValues.append('?');
+      if (idx != columnCount) {
+        columnNames.append(", ");
+        columnValues.append(",");
+      }
+    }
+    columnNames.append(")");
+    columnValues.append(")");
+    return columnNames.append(columnValues).toString();
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ConnectionConfigBuilder.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ConnectionConfigBuilder.java
new file mode 100644
index 0000000..34d61ad
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ConnectionConfigBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+
+class ConnectionConfigBuilder {
+  private String name;
+  private String url;
+  private String user;
+  private String password;
+
+  ConnectionConfigBuilder withName(String name) {
+    this.name = name;
+    return this;
+  }
+
+  ConnectionConfigBuilder withUrl(String url) {
+    this.url = url;
+    return this;
+  }
+
+  ConnectionConfigBuilder withUser(String user) {
+    this.user = user;
+    return this;
+  }
+
+  ConnectionConfigBuilder withPassword(String password) {
+    this.password = password;
+    return this;
+  }
+
+  ConnectionConfiguration build() {
+    return new ConnectionConfiguration(name, url, user, password);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ElementType.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ElementType.java
new file mode 100644
index 0000000..580910c
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ElementType.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import java.util.Stack;
+
+import org.xml.sax.Attributes;
+
+import org.apache.geode.cache.CacheXmlException;
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.internal.cache.xmlcache.CacheCreation;
+
+enum ElementType {
+  CONNECTION_SERVICE("connector-service") {
+    @Override
+    void startElement(Stack<Object> stack, Attributes attributes) {
+      if (!(stack.peek() instanceof CacheCreation)) {
+        throw new CacheXmlException(
+            "jdbc <connector-service> elements must occur within <cache> elements");
+      }
+      CacheCreation cacheCreation = (CacheCreation) stack.peek();
+      JdbcServiceConfiguration serviceConfig = new JdbcServiceConfiguration();
+      cacheCreation.getExtensionPoint().addExtension(serviceConfig);
+      stack.push(serviceConfig);
+    }
+
+    @Override
+    void endElement(Stack<Object> stack) {
+      stack.pop();
+    }
+  },
+  CONNECTION("connection") {
+    @Override
+    void startElement(Stack<Object> stack, Attributes attributes) {
+      if (!(stack.peek() instanceof JdbcServiceConfiguration)) {
+        throw new CacheXmlException(
+            "jdbc <connection> elements must occur within <connector-service> elements");
+      }
+      ConnectionConfigBuilder connectionConfig = new ConnectionConfigBuilder()
+          .withName(attributes.getValue(JdbcConnectorServiceXmlParser.NAME))
+          .withUrl(attributes.getValue(JdbcConnectorServiceXmlParser.URL))
+          .withUser(attributes.getValue(JdbcConnectorServiceXmlParser.USER))
+          .withPassword(attributes.getValue(JdbcConnectorServiceXmlParser.PASSWORD));
+      stack.push(connectionConfig);
+    }
+
+    @Override
+    void endElement(Stack<Object> stack) {
+      ConnectionConfiguration config = ((ConnectionConfigBuilder) stack.pop()).build();
+      JdbcServiceConfiguration connectorService = (JdbcServiceConfiguration) stack.peek();
+      connectorService.addConnectionConfig(config);
+    }
+  },
+  REGION_MAPPING("region-mapping") {
+    @Override
+    void startElement(Stack<Object> stack, Attributes attributes) {
+      if (!(stack.peek() instanceof JdbcServiceConfiguration)) {
+        throw new CacheXmlException(
+            "jdbc <region-mapping> elements must occur within <connector-service> elements");
+      }
+      RegionMappingBuilder mapping = new RegionMappingBuilder()
+          .withRegionName(attributes.getValue(JdbcConnectorServiceXmlParser.REGION))
+          .withConnectionConfigName(
+              attributes.getValue(JdbcConnectorServiceXmlParser.CONNECTION_NAME))
+          .withTableName(attributes.getValue(JdbcConnectorServiceXmlParser.TABLE))
+          .withPdxClassName(attributes.getValue(JdbcConnectorServiceXmlParser.PDX_CLASS))
+          .withPrimaryKeyInValue(
+              attributes.getValue(JdbcConnectorServiceXmlParser.PRIMARY_KEY_IN_VALUE));
+      stack.push(mapping);
+    }
+
+    @Override
+    void endElement(Stack<Object> stack) {
+      RegionMapping mapping = ((RegionMappingBuilder) stack.pop()).build();
+      JdbcServiceConfiguration connectorService = (JdbcServiceConfiguration) stack.peek();
+      connectorService.addRegionMapping(mapping);
+    }
+  },
+  FIELD_MAPPING("field-mapping") {
+    @Override
+    void startElement(Stack<Object> stack, Attributes attributes) {
+      if (!(stack.peek() instanceof RegionMappingBuilder)) {
+        throw new CacheXmlException(
+            "jdbc <field-mapping> elements must occur within <region-mapping> elements");
+      }
+      RegionMappingBuilder mapping = (RegionMappingBuilder) stack.peek();
+      String fieldName = attributes.getValue(JdbcConnectorServiceXmlParser.FIELD_NAME);
+      String columnName = attributes.getValue(JdbcConnectorServiceXmlParser.COLUMN_NAME);
+      mapping.withFieldToColumnMapping(fieldName, columnName);
+    }
+
+    @Override
+    void endElement(Stack<Object> stack) {}
+  };
+
+  private String typeName;
+
+  ElementType(String typeName) {
+    this.typeName = typeName;
+  }
+
+  static ElementType getTypeFromName(String typeName) {
+    for (ElementType type : ElementType.values()) {
+      if (type.typeName.equals(typeName))
+        return type;
+    }
+    throw new IllegalArgumentException("Invalid type '" + typeName + "'");
+  }
+
+  String getTypeName() {
+    return typeName;
+  }
+
+  abstract void startElement(Stack<Object> stack, Attributes attributes);
+
+  abstract void endElement(Stack<Object> stack);
+
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGenerator.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGenerator.java
new file mode 100644
index 0000000..867dd7f
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGenerator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.COLUMN_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.CONNECTION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.FIELD_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.NAMESPACE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PASSWORD;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PDX_CLASS;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PRIMARY_KEY_IN_VALUE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.REGION;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.TABLE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.URL;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.USER;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.AttributesImpl;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator;
+import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
+import org.apache.geode.internal.cache.xmlcache.XmlGeneratorUtils;
+
+public class JdbcConnectorServiceXmlGenerator implements XmlGenerator<Cache> {
+  private static final AttributesImpl EMPTY = new AttributesImpl();
+  static final String PREFIX = "jdbc";
+
+  private final Collection<ConnectionConfiguration> connections;
+  private final Collection<RegionMapping> mappings;
+
+  public JdbcConnectorServiceXmlGenerator(Collection<ConnectionConfiguration> connections,
+      Collection<RegionMapping> mappings) {
+    this.connections = connections != null ? connections : Collections.emptyList();
+    this.mappings = mappings != null ? mappings : Collections.emptyList();
+  }
+
+  @Override
+  public String getNamespaceUri() {
+    return NAMESPACE;
+  }
+
+  @Override
+  public void generate(CacheXmlGenerator cacheXmlGenerator) throws SAXException {
+    final ContentHandler handler = cacheXmlGenerator.getContentHandler();
+
+    handler.startPrefixMapping(PREFIX, NAMESPACE);
+    XmlGeneratorUtils.startElement(handler, PREFIX, ElementType.CONNECTION_SERVICE.getTypeName(),
+        EMPTY);
+    for (ConnectionConfiguration connection : connections) {
+      outputConnectionConfiguration(handler, connection);
+    }
+    for (RegionMapping mapping : mappings) {
+      outputRegionMapping(handler, mapping);
+    }
+    XmlGeneratorUtils.endElement(handler, PREFIX, ElementType.CONNECTION_SERVICE.getTypeName());
+  }
+
+  /**
+   * For testing only
+   */
+  Collection<ConnectionConfiguration> getConnections() {
+    return connections;
+  }
+
+  /**
+   * For testing only
+   */
+  Collection<RegionMapping> getMappings() {
+    return mappings;
+  }
+
+  private void outputConnectionConfiguration(ContentHandler handler, ConnectionConfiguration config)
+      throws SAXException {
+    AttributesImpl attributes = new AttributesImpl();
+    XmlGeneratorUtils.addAttribute(attributes, NAME, config.getName());
+    XmlGeneratorUtils.addAttribute(attributes, URL, config.getUrl());
+    XmlGeneratorUtils.addAttribute(attributes, USER, config.getUser());
+    XmlGeneratorUtils.addAttribute(attributes, PASSWORD, config.getPassword());
+    XmlGeneratorUtils.emptyElement(handler, PREFIX, ElementType.CONNECTION.getTypeName(),
+        attributes);
+  }
+
+  private void outputRegionMapping(ContentHandler handler, RegionMapping mapping)
+      throws SAXException {
+    AttributesImpl attributes = new AttributesImpl();
+    XmlGeneratorUtils.addAttribute(attributes, CONNECTION_NAME, mapping.getConnectionConfigName());
+    XmlGeneratorUtils.addAttribute(attributes, REGION, mapping.getRegionName());
+    XmlGeneratorUtils.addAttribute(attributes, TABLE, mapping.getTableName());
+    XmlGeneratorUtils.addAttribute(attributes, PDX_CLASS, mapping.getPdxClassName());
+    XmlGeneratorUtils.addAttribute(attributes, PRIMARY_KEY_IN_VALUE,
+        Boolean.toString(mapping.isPrimaryKeyInValue()));
+
+    XmlGeneratorUtils.startElement(handler, PREFIX, ElementType.REGION_MAPPING.getTypeName(),
+        attributes);
+    addFieldMappings(handler, mapping.getFieldToColumnMap());
+    XmlGeneratorUtils.endElement(handler, PREFIX, ElementType.REGION_MAPPING.getTypeName());
+  }
+
+  private void addFieldMappings(ContentHandler handler, Map<String, String> fieldMappings)
+      throws SAXException {
+    for (Map.Entry<String, String> fieldMapping : fieldMappings.entrySet()) {
+      AttributesImpl fieldAttributes = new AttributesImpl();
+      XmlGeneratorUtils.addAttribute(fieldAttributes, FIELD_NAME, fieldMapping.getKey());
+      XmlGeneratorUtils.addAttribute(fieldAttributes, COLUMN_NAME, fieldMapping.getValue());
+      XmlGeneratorUtils.emptyElement(handler, PREFIX, ElementType.FIELD_MAPPING.getTypeName(),
+          fieldAttributes);
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParser.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParser.java
new file mode 100644
index 0000000..7caaa1d
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+
+import org.apache.geode.internal.cache.xmlcache.AbstractXmlParser;
+
+public class JdbcConnectorServiceXmlParser extends AbstractXmlParser {
+  static final String NAMESPACE = "http://geode.apache.org/schema/jdbc-connector";
+  static final String NAME = "name";
+  static final String URL = "url";
+  static final String USER = "user";
+  static final String PASSWORD = "password";
+  static final String REGION = "region";
+  static final String CONNECTION_NAME = "connection-name";
+  static final String TABLE = "table";
+  static final String PDX_CLASS = "pdx-class";
+  static final String FIELD_NAME = "field-name";
+  static final String COLUMN_NAME = "column-name";
+  static final String PRIMARY_KEY_IN_VALUE = "primary-key-in-value";
+
+  @Override
+  public String getNamespaceUri() {
+    return NAMESPACE;
+  }
+
+  @Override
+  public void startElement(String uri, String localName, String qName, Attributes attributes)
+      throws SAXException {
+    if (!NAMESPACE.equals(uri)) {
+      return;
+    }
+    ElementType.getTypeFromName(localName).startElement(stack, attributes);
+  }
+
+  @Override
+  public void endElement(String uri, String localName, String qName) throws SAXException {
+    if (!NAMESPACE.equals(uri)) {
+      return;
+    }
+    ElementType.getTypeFromName(localName).endElement(stack);
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcServiceConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcServiceConfiguration.java
new file mode 100644
index 0000000..2892334
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcServiceConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.extension.Extensible;
+import org.apache.geode.internal.cache.extension.Extension;
+import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
+
+public class JdbcServiceConfiguration implements Extension<Cache> {
+
+  private final List<ConnectionConfiguration> connections = new ArrayList<>();
+  private final List<RegionMapping> mappings = new ArrayList<>();
+
+  void addConnectionConfig(ConnectionConfiguration config) {
+    connections.add(config);
+  }
+
+  void addRegionMapping(RegionMapping mapping) {
+    mappings.add(mapping);
+  }
+
+  @Override
+  public XmlGenerator<Cache> getXmlGenerator() {
+    return new JdbcConnectorServiceXmlGenerator(connections, mappings);
+  }
+
+  @Override
+  public void beforeCreate(Extensible<Cache> source, Cache cache) {
+    // nothing
+  }
+
+  @Override
+  public void onCreate(Extensible<Cache> source, Extensible<Cache> target) {
+    InternalCache internalCache = (InternalCache) target;
+    InternalJdbcConnectorService service =
+        internalCache.getService(InternalJdbcConnectorService.class);
+    connections.forEach(connection -> service.addOrUpdateConnectionConfig(connection));
+    mappings.forEach(mapping -> service.addOrUpdateRegionMapping(mapping));
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingBuilder.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingBuilder.java
new file mode 100644
index 0000000..b4adcaf
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingBuilder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+
+class RegionMappingBuilder {
+
+  private String regionName;
+  private String pdxClassName;
+  private String tableName;
+  private String connectionConfigName;
+  private boolean primaryKeyInValue;
+  private final Map<String, String> fieldToColumnMap = new HashMap<>();
+
+  RegionMappingBuilder withRegionName(String regionName) {
+    this.regionName = regionName;
+    return this;
+  }
+
+  RegionMappingBuilder withPdxClassName(String pdxClassName) {
+    this.pdxClassName = pdxClassName;
+    return this;
+  }
+
+  RegionMappingBuilder withTableName(String tableName) {
+    this.tableName = tableName;
+    return this;
+  }
+
+  RegionMappingBuilder withConnectionConfigName(String connectionConfigName) {
+    this.connectionConfigName = connectionConfigName;
+    return this;
+  }
+
+  RegionMappingBuilder withPrimaryKeyInValue(String primaryKeyInValue) {
+    this.primaryKeyInValue = Boolean.parseBoolean(primaryKeyInValue);
+    return this;
+  }
+
+  RegionMappingBuilder withFieldToColumnMapping(String fieldName, String columnMapping) {
+    this.fieldToColumnMap.put(fieldName, columnMapping);
+    return this;
+  }
+
+  RegionMapping build() {
+    return new RegionMapping(regionName, pdxClassName, tableName, connectionConfigName,
+        primaryKeyInValue, fieldToColumnMap);
+  }
+}
diff --git a/geode-connectors/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService b/geode-connectors/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService
new file mode 100644
index 0000000..bc91015
--- /dev/null
+++ b/geode-connectors/src/main/resources/META-INF/services/org.apache.geode.internal.cache.CacheService
@@ -0,0 +1 @@
+org.apache.geode.connectors.jdbc.internal.JdbcConnectorService
\ No newline at end of file
diff --git a/geode-connectors/src/main/resources/META-INF/services/org.apache.geode.internal.cache.xmlcache.XmlParser b/geode-connectors/src/main/resources/META-INF/services/org.apache.geode.internal.cache.xmlcache.XmlParser
new file mode 100644
index 0000000..9fbc42a
--- /dev/null
+++ b/geode-connectors/src/main/resources/META-INF/services/org.apache.geode.internal.cache.xmlcache.XmlParser
@@ -0,0 +1 @@
+org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser
\ No newline at end of file
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java
new file mode 100644
index 0000000..82e2e90
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+
+@SuppressWarnings("unused")
+public class Employee implements PdxSerializable {
+  private String name;
+  private int age;
+
+  public Employee() {
+    // nothing
+  }
+
+  Employee(String name, int age) {
+    this.name = name;
+    this.age = age;
+  }
+
+  String getName() {
+    return name;
+  }
+
+  int getAge() {
+    return age;
+  }
+
+  @Override
+  public void toData(PdxWriter writer) {
+    writer.writeString("name", this.name);
+    writer.writeInt("age", this.age);
+  }
+
+  @Override
+  public void fromData(PdxReader reader) {
+    this.name = reader.readString("name");
+    this.age = reader.readInt("age");
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
new file mode 100644
index 0000000..1655bd9
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
+import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class JdbcAsyncWriterIntegrationTest {
+
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+
+  private Cache cache;
+  private Region<String, PdxInstance> employees;
+  private Connection connection;
+  private Statement statement;
+  private JdbcAsyncWriter jdbcWriter;
+  private PdxInstance pdxEmployee1;
+  private PdxInstance pdxEmployee2;
+  private Employee employee1;
+  private Employee employee2;
+
+  @Before
+  public void setup() throws Exception {
+    cache = new CacheFactory().setPdxReadSerialized(false).create();
+    employees = createRegionWithJDBCAsyncWriter(REGION_TABLE_NAME);
+    connection = DriverManager.getConnection(CONNECTION_URL);
+    statement = connection.createStatement();
+    statement.execute("Create Table " + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+    pdxEmployee1 = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("name", "Emp1").writeInt("age", 55).create();
+    pdxEmployee2 = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("name", "Emp2").writeInt("age", 21).create();
+    employee1 = (Employee) pdxEmployee1.getObject();
+    employee2 = (Employee) pdxEmployee2.getObject();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cache.close();
+    closeDB();
+  }
+
+  private void closeDB() throws Exception {
+    if (statement == null) {
+      statement = connection.createStatement();
+    }
+    statement.execute("Drop table " + REGION_TABLE_NAME);
+    statement.close();
+
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void validateJDBCAsyncWriterTotalEvents() {
+    employees.put("1", pdxEmployee1);
+    employees.put("2", pdxEmployee2);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
+  }
+
+  @Test
+  public void canInsertIntoTable() throws Exception {
+    employees.put("1", pdxEmployee1);
+    employees.put("2", pdxEmployee2);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
+    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+        .writeInt("age", 55).writeInt("id", 3).create();
+    employees.put("1", pdx1);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void putNonPdxInstanceFails() {
+    Region nonPdxEmployees = this.employees;
+    nonPdxEmployees.put("1", "non pdx instance");
+
+    awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(1));
+
+    assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(0);
+  }
+
+  @Test
+  public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
+    Region nonPdxEmployees = this.employees;
+    Employee value = new Employee("Emp2", 22);
+    nonPdxEmployees.put("2", value);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", value);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canDestroyFromTable() throws Exception {
+    employees.put("1", pdxEmployee1);
+    employees.put("2", pdxEmployee2);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    employees.destroy("1");
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(3));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canUpdateTable() throws Exception {
+    employees.put("1", pdxEmployee1);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    employees.put("1", pdxEmployee2);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canUpdateBecomeInsert() throws Exception {
+    employees.put("1", pdxEmployee1);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
+    validateTableRowCount(0);
+
+    employees.put("1", pdxEmployee2);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canInsertBecomeUpdate() throws Exception {
+    statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
+    validateTableRowCount(1);
+
+    employees.put("1", pdxEmployee1);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  private void awaitUntil(final Runnable supplier) {
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(supplier);
+  }
+
+  private void assertRecordMatchesEmployee(ResultSet resultSet, String key, Employee employee)
+      throws SQLException {
+    assertThat(resultSet.next()).isTrue();
+    assertThat(resultSet.getString("id")).isEqualTo(key);
+    assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
+    assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
+  }
+
+  private Region<String, PdxInstance> createRegionWithJDBCAsyncWriter(String regionName) {
+    jdbcWriter = new JdbcAsyncWriter(createSqlHandler());
+    cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
+        .create("jdbcAsyncQueue", jdbcWriter);
+
+    RegionFactory<String, PdxInstance> regionFactory = cache.createRegionFactory(REPLICATE);
+    regionFactory.addAsyncEventQueueId("jdbcAsyncQueue");
+    return regionFactory.create(regionName);
+  }
+
+  private void validateTableRowCount(int expected) throws Exception {
+    ResultSet resultSet = statement.executeQuery("select count(*) from " + REGION_TABLE_NAME);
+    resultSet.next();
+    int size = resultSet.getInt(1);
+    assertThat(size).isEqualTo(expected);
+  }
+
+  private SqlHandler createSqlHandler() {
+    return new SqlHandler(new TestableConnectionManager(TestConfigService.getTestConfigService()));
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
new file mode 100644
index 0000000..ef285dd
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JdbcAsyncWriterTest {
+
+  private SqlHandler sqlHandler;
+  private JdbcAsyncWriter writer;
+
+  @Before
+  public void setup() {
+    sqlHandler = mock(SqlHandler.class);
+    writer = new JdbcAsyncWriter(sqlHandler);
+  }
+
+  @Test
+  public void throwsNullPointerExceptionIfGivenNullList() {
+    assertThatThrownBy(() -> writer.processEvents(null)).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void doesNothingIfEventListIsEmpty() {
+    writer.processEvents(Collections.emptyList());
+
+    verifyZeroInteractions(sqlHandler);
+    assertThat(writer.getSuccessfulEvents()).isZero();
+    assertThat(writer.getTotalEvents()).isZero();
+  }
+
+  @Test
+  public void writesAProvidedEvent() {
+    writer.processEvents(Collections.singletonList(createMockEvent()));
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), any());
+    assertThat(writer.getSuccessfulEvents()).isEqualTo(1);
+    assertThat(writer.getTotalEvents()).isEqualTo(1);
+  }
+
+  @Test
+  public void writesMultipleProvidedEvents() {
+    List<AsyncEvent> events = new ArrayList<>();
+    events.add(createMockEvent());
+    events.add(createMockEvent());
+    events.add(createMockEvent());
+
+    writer.processEvents(events);
+
+    verify(sqlHandler, times(3)).write(any(), any(), any(), any());
+    assertThat(writer.getSuccessfulEvents()).isEqualTo(3);
+    assertThat(writer.getTotalEvents()).isEqualTo(3);
+  }
+
+  private AsyncEvent createMockEvent() {
+    AsyncEvent event = mock(AsyncEvent.class);
+    when(event.getRegion()).thenReturn(mock(InternalRegion.class));
+    return event;
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
new file mode 100644
index 0000000..3905531
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
+import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class JdbcLoaderIntegrationTest {
+
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+
+  private Cache cache;
+  private Connection connection;
+  private Statement statement;
+
+  @Before
+  public void setup() throws Exception {
+    cache = new CacheFactory().setPdxReadSerialized(false).create();
+    connection = DriverManager.getConnection(CONNECTION_URL);
+    statement = connection.createStatement();
+    statement.execute("Create Table " + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cache.close();
+    closeDB();
+  }
+
+  private void closeDB() throws Exception {
+    if (statement == null) {
+      statement = connection.createStatement();
+    }
+    statement.execute("Drop table " + REGION_TABLE_NAME);
+    statement.close();
+
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void verifySimpleGet() throws SQLException {
+    statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'Emp1', 21)");
+    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME);
+    PdxInstance pdx = region.get("1");
+
+    assertThat(pdx.getField("name")).isEqualTo("Emp1");
+    assertThat(pdx.getField("age")).isEqualTo(21);
+  }
+
+  @Test
+  public void verifySimpleMiss() throws SQLException {
+    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME);
+    PdxInstance pdx = region.get("1");
+    assertThat(pdx).isNull();
+  }
+
+  private SqlHandler createSqlHandler() {
+    return new SqlHandler(new TestableConnectionManager(TestConfigService.getTestConfigService()));
+  }
+
+  private Region<String, PdxInstance> createRegionWithJDBCLoader(String regionName) {
+    JdbcLoader<String, PdxInstance> jdbcLoader = new JdbcLoader<>(createSqlHandler());
+    RegionFactory<String, PdxInstance> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+    rf.setCacheLoader(jdbcLoader);
+    return rf.create(regionName);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java
new file mode 100644
index 0000000..8b08587
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JdbcLoaderTest {
+
+  @Test
+  public void loadReadsFromSqlHandler() {
+    SqlHandler sqlHandler = mock(SqlHandler.class);
+    JdbcLoader<Object, Object> loader = new JdbcLoader<>(sqlHandler);
+    LoaderHelper loaderHelper = mock(LoaderHelper.class);
+    when(loaderHelper.getRegion()).thenReturn(mock(InternalRegion.class));
+    loader.load(loaderHelper);
+    verify(sqlHandler, times(1)).read(any(), any());
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
new file mode 100644
index 0000000..e26b3ee
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static com.googlecode.catchexception.CatchException.catchException;
+import static com.googlecode.catchexception.CatchException.caughtException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.connectors.jdbc.internal.TestConfigService;
+import org.apache.geode.connectors.jdbc.internal.TestableConnectionManager;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class JdbcWriterIntegrationTest {
+
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+
+  private Cache cache;
+  private Region<String, PdxInstance> employees;
+  private Connection connection;
+  private Statement statement;
+  private JdbcWriter jdbcWriter;
+  private PdxInstance pdx1;
+  private PdxInstance pdx2;
+  private Employee employee1;
+  private Employee employee2;
+
+  @Before
+  public void setup() throws Exception {
+    cache = new CacheFactory().setPdxReadSerialized(false).create();
+    employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME);
+    connection = DriverManager.getConnection(CONNECTION_URL);
+    statement = connection.createStatement();
+    statement.execute("Create Table " + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+    pdx1 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    pdx2 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name", "Emp2")
+        .writeInt("age", 21).create();
+    employee1 = (Employee) pdx1.getObject();
+    employee2 = (Employee) pdx2.getObject();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cache.close();
+    closeDB();
+  }
+
+  private void closeDB() throws Exception {
+    if (statement == null) {
+      statement = connection.createStatement();
+    }
+    statement.execute("Drop table " + REGION_TABLE_NAME);
+    statement.close();
+
+    if (connection != null) {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void canInsertIntoTable() throws Exception {
+    employees.put("1", pdx1);
+    employees.put("2", pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canPutAllInsertIntoTable() throws Exception {
+    Map<String, PdxInstance> putAllMap = new HashMap<>();
+    putAllMap.put("1", pdx1);
+    putAllMap.put("2", pdx2);
+    employees.putAll(putAllMap);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
+    PdxInstance pdxInstanceWithId = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("name", "Emp1").writeInt("age", 55).writeInt("id", 3).create();
+    employees.put("1", pdxInstanceWithId);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", (Employee) pdxInstanceWithId.getObject());
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void putNonPdxInstanceFails() {
+    Region nonPdxEmployees = this.employees;
+    catchException(nonPdxEmployees).put("1", "non pdx instance");
+    assertThat((Exception) caughtException()).isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
+    Region nonPdxEmployees = this.employees;
+    Employee value = new Employee("Emp2", 22);
+    nonPdxEmployees.put("2", value);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", value);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canDestroyFromTable() throws Exception {
+    employees.put("1", pdx1);
+    employees.put("2", pdx2);
+
+    employees.destroy("1");
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canUpdateTable() throws Exception {
+    employees.put("1", pdx1);
+    employees.put("1", pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canUpdateBecomeInsert() throws Exception {
+    employees.put("1", pdx1);
+
+    statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
+    validateTableRowCount(0);
+
+    employees.put("1", pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canInsertBecomeUpdate() throws Exception {
+    statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
+    validateTableRowCount(1);
+
+    employees.put("1", pdx1);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  private Region<String, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName) {
+    jdbcWriter = new JdbcWriter(createSqlHandler());
+    jdbcWriter.init(new Properties());
+
+    RegionFactory<String, PdxInstance> regionFactory =
+        cache.createRegionFactory(RegionShortcut.REPLICATE);
+    regionFactory.setCacheWriter(jdbcWriter);
+    return regionFactory.create(regionName);
+  }
+
+  private void validateTableRowCount(int expected) throws Exception {
+    ResultSet resultSet = statement.executeQuery("select count(*) from " + REGION_TABLE_NAME);
+    resultSet.next();
+    int size = resultSet.getInt(1);
+    assertThat(size).isEqualTo(expected);
+  }
+
+  private SqlHandler createSqlHandler() {
+    return new SqlHandler(new TestableConnectionManager(TestConfigService.getTestConfigService()));
+  }
+
+  private void assertRecordMatchesEmployee(ResultSet resultSet, String key, Employee employee)
+      throws SQLException {
+    assertThat(resultSet.next()).isTrue();
+    assertThat(resultSet.getString("id")).isEqualTo(key);
+    assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
+    assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
new file mode 100644
index 0000000..1767c63
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcWriterTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.SerializedCacheValue;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JdbcWriterTest {
+
+  private EntryEvent<Object, Object> entryEvent;
+  private PdxInstance pdxInstance;
+  private SqlHandler sqlHandler;
+
+  @Before
+  public void setUp() {
+    entryEvent = mock(EntryEvent.class);
+    pdxInstance = mock(PdxInstance.class);
+    SerializedCacheValue<Object> serializedNewValue = mock(SerializedCacheValue.class);
+    sqlHandler = mock(SqlHandler.class);
+
+    when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+    when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+    when(serializedNewValue.getDeserializedValue()).thenReturn(pdxInstance);
+
+  }
+
+  @Test
+  public void beforeUpdateWithPdxInstanceWritesToSqlHandler() {
+    JdbcWriter<Object, Object> writer = new JdbcWriter<>(sqlHandler);
+
+    writer.beforeUpdate(entryEvent);
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+  }
+
+  @Test
+  public void beforeUpdateWithoutPdxInstanceWritesToSqlHandler() {
+    EntryEvent<Object, Object> entryEvent = mock(EntryEvent.class);
+    Object value = new Object();
+    SerializedCacheValue<Object> serializedNewValue = mock(SerializedCacheValue.class);
+    SqlHandler sqlHander = mock(SqlHandler.class);
+
+    when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+    when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+    when(serializedNewValue.getDeserializedValue()).thenReturn(value);
+
+    JdbcWriter<Object, Object> writer = new JdbcWriter<>(sqlHander);
+
+    assertThatThrownBy(() -> writer.beforeUpdate(entryEvent))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void beforeCreateWithPdxInstanceWritesToSqlHandler() {
+    JdbcWriter<Object, Object> writer = new JdbcWriter<>(sqlHandler);
+
+    writer.beforeCreate(entryEvent);
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+  }
+
+  @Test
+  public void beforeDestroyWithPdxInstanceWritesToSqlHandler() {
+    JdbcWriter<Object, Object> writer = new JdbcWriter<>(sqlHandler);
+
+    writer.beforeDestroy(entryEvent);
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+  }
+
+  @Test
+  public void beforeRegionDestroyDoesNotWriteToSqlHandler() {
+    JdbcWriter<Object, Object> writer = new JdbcWriter<>(sqlHandler);
+
+    writer.beforeRegionDestroy(mock(RegionEvent.class));
+
+    verifyZeroInteractions(sqlHandler);
+  }
+
+  @Test
+  public void beforeRegionClearDoesNotWriteToSqlHandler() {
+    JdbcWriter<Object, Object> writer = new JdbcWriter<>(sqlHandler);
+
+    writer.beforeRegionClear(mock(RegionEvent.class));
+
+    verifyZeroInteractions(sqlHandler);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
new file mode 100644
index 0000000..2199ebc
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/AbstractJdbcCallbackTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class AbstractJdbcCallbackTest {
+
+  private AbstractJdbcCallback jdbcCallback;
+  private SqlHandler sqlHandler;
+
+  @Before
+  public void setUp() throws Exception {
+    sqlHandler = mock(SqlHandler.class);
+    jdbcCallback = new AbstractJdbcCallback(sqlHandler) {};
+  }
+
+  @Test
+  public void closesSqlHandler() throws Exception {
+    jdbcCallback.close();
+    verify(sqlHandler, times(1)).close();
+  }
+
+  @Test
+  public void returnsCorrectSqlHander() throws Exception {
+    assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
+  }
+
+  @Test
+  public void checkInitializedDoesNothingIfInitialized() {
+    jdbcCallback.checkInitialized(mock(InternalCache.class));
+    assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
+  }
+
+  @Test
+  public void initializedSqlHandlerIfNoneExists() {
+    jdbcCallback = new AbstractJdbcCallback() {};
+    InternalCache cache = mock(InternalCache.class);
+    InternalJdbcConnectorService service = mock(InternalJdbcConnectorService.class);
+    when(cache.getService(any())).thenReturn(service);
+    assertThat(jdbcCallback.getSqlHandler()).isNull();
+
+    jdbcCallback.checkInitialized(cache);
+
+    assertThat(jdbcCallback.getSqlHandler()).isNotNull();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
new file mode 100644
index 0000000..a7468fa
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ColumnValueTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ColumnValueTest {
+  private static final String COLUMN_NAME = "columnName";
+  private static final Object VALUE = new Object();
+
+  private ColumnValue value;
+
+  @Before
+  public void setup() {
+    value = new ColumnValue(true, COLUMN_NAME, VALUE);
+  }
+
+  @Test
+  public void isKeyReturnsCorrectValue() {
+    assertThat(value.isKey()).isTrue();
+  }
+
+  @Test
+  public void hasCorrectColumnName() {
+    assertThat(value.getColumnName()).isEqualTo(COLUMN_NAME);
+  }
+
+  @Test
+  public void hasCorrectValue() {
+    assertThat(value.getValue()).isSameAs(VALUE);
+  }
+
+  @Test
+  public void toStringContainsAllVariables() {
+    assertThat(value.toString()).contains(Boolean.toString(true)).contains(COLUMN_NAME)
+        .contains(VALUE.toString());
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionConfigurationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionConfigurationTest.java
new file mode 100644
index 0000000..639e278
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionConfigurationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ConnectionConfigurationTest {
+
+  @Test
+  public void initiatedWithNullValues() {
+    ConnectionConfiguration config = new ConnectionConfiguration(null, null, null, null);
+    assertThat(config.getName()).isNull();
+    assertThat(config.getUrl()).isNull();
+    assertThat(config.getUser()).isNull();
+    assertThat(config.getPassword()).isNull();
+  }
+
+  @Test
+  public void hasCorrectName() {
+    String name = "name";
+    ConnectionConfiguration config = new ConnectionConfiguration(name, null, null, null);
+    assertThat(config.getName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectUrl() {
+    String url = "url";
+    ConnectionConfiguration config = new ConnectionConfiguration(null, url, null, null);
+    assertThat(config.getUrl()).isEqualTo(url);
+  }
+
+  @Test
+  public void hasCorrectUser() {
+    String user = "user";
+    ConnectionConfiguration config = new ConnectionConfiguration(null, null, user, null);
+    assertThat(config.getUser()).isEqualTo(user);
+  }
+
+  @Test
+  public void hasCorrectPassword() {
+    String password = "password";
+    ConnectionConfiguration config = new ConnectionConfiguration(null, null, null, password);
+    assertThat(config.getPassword()).isEqualTo(password);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
new file mode 100644
index 0000000..8358d1c
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ConnectionManagerUnitTest {
+  private static final String REGION_NAME = "testRegion";
+  private static final String TABLE_NAME = "testTable";
+  private static final String CONFIG_NAME = "configName";
+  private static final String KEY_COLUMN = "keyColumn";
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private JdbcConnectorService configService;
+  private ConnectionManager manager;
+  private Connection connection;
+  private ConnectionConfiguration connectionConfig;
+  private RegionMapping mapping;
+  private Object key = new Object();
+  private PdxInstance value = mock(PdxInstance.class);
+
+
+  @Before
+  public void setup() throws Exception {
+    configService = mock(JdbcConnectorService.class);
+    manager = spy(new ConnectionManager(configService));
+    connection = mock(Connection.class);
+
+    connectionConfig = getTestConnectionConfig("name", "url", null, null);
+    doReturn(connection).when(manager).getSQLConnection(connectionConfig);
+
+    mapping = mock(RegionMapping.class);
+    when(mapping.getTableName()).thenReturn(TABLE_NAME);
+  }
+
+  @Test
+  public void getsCorrectMapping() {
+    manager.getMappingForRegion(REGION_NAME);
+    verify(configService).getMappingForRegion(REGION_NAME);
+  }
+
+  @Test
+  public void getsCorrectConnectionConfig() {
+    manager.getConnectionConfig(CONFIG_NAME);
+    verify(configService).getConnectionConfig(CONFIG_NAME);
+  }
+
+  @Test
+  public void retrievesANewConnection() throws Exception {
+    Connection returnedConnection = manager.getConnection(connectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
+  }
+
+  @Test
+  public void retrievesSameConnectionForSameConnectionConfig() throws Exception {
+    Connection returnedConnection = manager.getConnection(connectionConfig);
+    Connection secondReturnedConnection = manager.getConnection(connectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
+    assertThat(secondReturnedConnection).isNotNull().isSameAs(connection);
+  }
+
+  @Test
+  public void retrievesDifferentConnectionForEachConfig() throws Exception {
+    Connection secondConnection = mock(Connection.class);
+    ConnectionConfiguration secondConnectionConfig =
+        getTestConnectionConfig("newName", "url", null, null);
+    doReturn(secondConnection).when(manager).getSQLConnection(secondConnectionConfig);
+
+    Connection returnedConnection = manager.getConnection(connectionConfig);
+    Connection secondReturnedConnection = manager.getConnection(secondConnectionConfig);
+    assertThat(returnedConnection).isNotNull().isSameAs(connection);
+    assertThat(secondReturnedConnection).isNotNull().isSameAs(secondConnection);
+    assertThat(returnedConnection).isNotSameAs(secondReturnedConnection);
+  }
+
+  @Test
+  public void retrievesANewConnectionIfCachedOneIsClosed() throws Exception {
+    manager.getConnection(connectionConfig);
+    when(connection.isClosed()).thenReturn(true);
+
+    Connection secondConnection = mock(Connection.class);
+    doReturn(secondConnection).when(manager).getSQLConnection(connectionConfig);
+    Connection secondReturnedConnection = manager.getConnection(connectionConfig);
+    assertThat(secondReturnedConnection).isSameAs(secondConnection);
+  }
+
+  @Test
+  public void closesAllConnections() throws Exception {
+    Connection secondConnection = mock(Connection.class);
+    ConnectionConfiguration secondConnectionConfig =
+        getTestConnectionConfig("newName", "url", null, null);
+
+    doReturn(secondConnection).when(manager).getSQLConnection(secondConnectionConfig);
+    manager.getConnection(connectionConfig);
+    manager.getConnection(secondConnectionConfig);
+
+    manager.close();
+    verify(connection).close();
+    verify(secondConnection).close();
+  }
+
+  @Test
+  public void returnsCorrectColumnForDestroy() throws Exception {
+    ResultSet primaryKeys = getPrimaryKeysMetadData();
+    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
+
+    List<ColumnValue> columnValueList =
+        manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.DESTROY);
+    assertThat(columnValueList).hasSize(1);
+    assertThat(columnValueList.get(0).getColumnName()).isEqualTo(KEY_COLUMN);
+  }
+
+  @Test
+  public void returnsCorrectColumnForGet() throws Exception {
+    ResultSet primaryKeys = getPrimaryKeysMetadData();
+    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
+
+    List<ColumnValue> columnValueList =
+        manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.GET);
+    assertThat(columnValueList).hasSize(1);
+    assertThat(columnValueList.get(0).getColumnName()).isEqualTo(KEY_COLUMN);
+  }
+
+  @Test
+  public void throwsExceptionIfTableHasCompositePrimaryKey() throws Exception {
+    ResultSet primaryKeys = getPrimaryKeysMetadData();
+    when(primaryKeys.next()).thenReturn(true);
+
+    thrown.expect(IllegalStateException.class);
+    manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.GET);
+  }
+
+  @Test
+  public void throwsExceptionWhenTwoTablesHasCaseInsensitiveSameName() throws Exception {
+    DatabaseMetaData metadata = mock(DatabaseMetaData.class);
+    when(connection.getMetaData()).thenReturn(metadata);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(resultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    when(resultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase());
+    when(metadata.getTables(any(), any(), any(), any())).thenReturn(resultSet);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Duplicate tables that match region name");
+    manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.GET);
+  }
+
+  @Test
+  public void throwsExceptionWhenDesiredTableNotFound() throws Exception {
+    DatabaseMetaData metadata = mock(DatabaseMetaData.class);
+    when(connection.getMetaData()).thenReturn(metadata);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString("TABLE_NAME")).thenReturn("otherTable");
+    when(metadata.getTables(any(), any(), any(), any())).thenReturn(resultSet);
+
+    thrown.expect(IllegalStateException.class);
+    manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.GET);
+  }
+
+  @Test
+  public void throwsExceptionWhenNoPrimaryKeyInTable() throws Exception {
+    ResultSet primaryKeys = getPrimaryKeysMetadData();
+    when(primaryKeys.next()).thenReturn(false);
+
+    thrown.expect(IllegalStateException.class);
+    manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.GET);
+  }
+
+  @Test
+  public void throwsExceptionWhenFailsToGetTableMetadata() throws Exception {
+    when(connection.getMetaData()).thenThrow(SQLException.class);
+
+    thrown.expect(IllegalStateException.class);
+    manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.GET);
+  }
+
+  @Test
+  public void returnsCorrectColumnsForUpsertOperations() throws Exception {
+    ResultSet primaryKeys = getPrimaryKeysMetadData();
+    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
+
+    String nonKeyColumn = "otherColumn";
+    when(mapping.getColumnNameForField(KEY_COLUMN)).thenReturn(KEY_COLUMN);
+    when(mapping.getColumnNameForField(nonKeyColumn)).thenReturn(nonKeyColumn);
+    when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn));
+
+    List<ColumnValue> columnValueList =
+        manager.getColumnToValueList(connectionConfig, mapping, key, value, Operation.UPDATE);
+    assertThat(columnValueList).hasSize(2);
+    assertThat(columnValueList.get(0).getColumnName()).isEqualTo(nonKeyColumn);
+    assertThat(columnValueList.get(1).getColumnName()).isEqualTo(KEY_COLUMN);
+  }
+
+  private ConnectionConfiguration getTestConnectionConfig(String name, String url, String user,
+      String password) {
+    ConnectionConfiguration config = new ConnectionConfiguration(name, url, user, password);
+    return config;
+  }
+
+  private ResultSet getPrimaryKeysMetadData() throws SQLException {
+    DatabaseMetaData metadata = mock(DatabaseMetaData.class);
+    when(connection.getMetaData()).thenReturn(metadata);
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    when(metadata.getTables(any(), any(), any(), any())).thenReturn(resultSet);
+    ResultSet primaryKeys = mock(ResultSet.class);
+    when(metadata.getPrimaryKeys(any(), any(), anyString())).thenReturn(primaryKeys);
+    when(primaryKeys.getString("COLUMN_NAME")).thenReturn(KEY_COLUMN);
+    return primaryKeys;
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceTest.java
new file mode 100644
index 0000000..d91e4df
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.extension.ExtensionPoint;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JdbcConnectorServiceTest {
+  private static final String TEST_CONFIG_NAME = "testConfig";
+  private static final String TEST_REGION_NAME = "testRegion";
+
+  private JdbcConnectorService service = new JdbcConnectorService();
+
+  @Before
+  public void setup() {
+    InternalCache cache = mock(InternalCache.class);
+    when(cache.getExtensionPoint()).thenReturn(mock(ExtensionPoint.class));
+    service.init(cache);
+  }
+
+  @Test
+  public void returnsNoConfigIfEmpty() {
+    assertThat(service.getConnectionConfig("foo")).isNull();
+  }
+
+  @Test
+  public void returnsNoMappingIfEmpty() {
+    assertThat(service.getMappingForRegion("foo")).isNull();
+  }
+
+  @Test
+  public void returnsCorrectConfig() {
+    ConnectionConfiguration config = mock(ConnectionConfiguration.class);
+    when(config.getName()).thenReturn(TEST_CONFIG_NAME);
+    service.addOrUpdateConnectionConfig(config);
+
+    assertThat(service.getConnectionConfig(TEST_CONFIG_NAME)).isSameAs(config);
+  }
+
+  @Test
+  public void doesNotReturnConfigWithDifferentName() {
+    ConnectionConfiguration config = mock(ConnectionConfiguration.class);
+    when(config.getName()).thenReturn("theOtherConfig");
+    service.addOrUpdateConnectionConfig(config);
+
+    assertThat(service.getConnectionConfig(TEST_CONFIG_NAME)).isNull();
+  }
+
+  @Test
+  public void returnsCorrectMapping() {
+    RegionMapping mapping = mock(RegionMapping.class);
+    when(mapping.getRegionName()).thenReturn(TEST_REGION_NAME);
+    service.addOrUpdateRegionMapping(mapping);
+
+    assertThat(service.getMappingForRegion(TEST_REGION_NAME)).isSameAs(mapping);
+  }
+
+  @Test
+  public void doesNotReturnMappingForDifferentRegion() {
+    RegionMapping mapping = mock(RegionMapping.class);
+    when(mapping.getRegionName()).thenReturn("theOtherMapping");
+    service.addOrUpdateRegionMapping(mapping);
+
+    assertThat(service.getMappingForRegion(TEST_REGION_NAME)).isNull();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java
new file mode 100644
index 0000000..ed78310
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/PreparedStatementCacheTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PreparedStatementCacheTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private PreparedStatementCache cache;
+  private Connection connection;
+  private List<ColumnValue> values = new ArrayList<>();
+
+  @Before
+  public void setup() throws SQLException {
+    cache = new PreparedStatementCache();
+    connection = mock(Connection.class);
+    when(connection.prepareStatement(any())).thenReturn(mock(PreparedStatement.class));
+    values.add(mock(ColumnValue.class));
+  }
+
+  @Test
+  public void returnsSameStatementForIdenticalInputs() throws Exception {
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    verify(connection, times(1)).prepareStatement(any());
+  }
+
+  @Test
+  public void returnsDifferentStatementForNonIdenticalInputs() throws Exception {
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+    cache.getPreparedStatement(connection, values, "table2", Operation.UPDATE, 1);
+    verify(connection, times(2)).prepareStatement(any());
+  }
+
+  @Test()
+  public void throwsExceptionIfPreparingStatementFails() throws Exception {
+    when(connection.prepareStatement(any())).thenThrow(SQLException.class);
+    thrown.expect(IllegalStateException.class);
+    cache.getPreparedStatement(connection, values, "table1", Operation.UPDATE, 1);
+  }
+
+  @Test
+  public void throwsExceptionIfInvalidOperationGiven() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    cache.getPreparedStatement(connection, values, "table1", Operation.REGION_CLOSE, 1);
+
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
new file mode 100644
index 0000000..1c7201c
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class RegionMappingTest {
+
+  @Test
+  public void initiatedWithNullValues() {
+    RegionMapping mapping = new RegionMapping(null, null, null, null, false, null);
+    assertThat(mapping.getTableName()).isNull();
+    assertThat(mapping.getRegionName()).isNull();
+    assertThat(mapping.getConnectionConfigName()).isNull();
+    assertThat(mapping.getPdxClassName()).isNull();
+  }
+
+  @Test
+  public void hasCorrectTableName() {
+    String name = "name";
+    RegionMapping mapping = new RegionMapping(null, null, name, null, false, null);
+    assertThat(mapping.getTableName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectRegionName() {
+    String name = "name";
+    RegionMapping mapping = new RegionMapping(name, null, null, null, false, null);
+    assertThat(mapping.getRegionName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectConfigName() {
+    String name = "name";
+    RegionMapping mapping = new RegionMapping(null, null, null, name, false, null);
+    assertThat(mapping.getConnectionConfigName()).isEqualTo(name);
+  }
+
+  @Test
+  public void hasCorrectPdxClassName() {
+    String name = "name";
+    RegionMapping mapping = new RegionMapping(null, name, null, null, false, null);
+    assertThat(mapping.getPdxClassName()).isEqualTo(name);
+  }
+
+  @Test
+  public void primaryKeyInValueSetCorrectly() {
+    RegionMapping mapping = new RegionMapping(null, null, null, null, true, null);
+    assertThat(mapping.isPrimaryKeyInValue()).isTrue();
+  }
+
+  @Test
+  public void returnsFieldNameIfColumnNotMapped() {
+    String fieldName = "myField";
+    Map<String, String> fieldMap = new HashMap<>();
+    fieldMap.put("otherField", "column");
+    RegionMapping mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    assertThat(mapping.getColumnNameForField(fieldName)).isEqualTo(fieldName);
+  }
+
+  @Test
+  public void returnsMappedColumnNameForField() {
+    String fieldName = "myField";
+    String columnName = "myColumn";
+    Map<String, String> fieldMap = new HashMap<>();
+    fieldMap.put(fieldName, columnName);
+    RegionMapping mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+    assertThat(mapping.getColumnNameForField(fieldName)).isEqualTo(columnName);
+  }
+
+  @Test
+  public void returnsAllMappings() {
+    String fieldName1 = "myField1";
+    String columnName1 = "myColumn1";
+    String fieldName2 = "myField2";
+    String columnName2 = "myColumn2";
+    Map<String, String> fieldMap = new HashMap<>();
+    fieldMap.put(fieldName1, columnName1);
+    fieldMap.put(fieldName2, columnName2);
+    RegionMapping mapping = new RegionMapping(null, null, null, null, true, fieldMap);
+
+    assertThat(mapping.getFieldToColumnMap().size()).isEqualTo(2);
+    assertThat(mapping.getFieldToColumnMap()).containsOnlyKeys(fieldName1, fieldName2);
+    assertThat(mapping.getFieldToColumnMap()).containsEntry(fieldName1, columnName1)
+        .containsEntry(fieldName2, columnName2);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
new file mode 100644
index 0000000..7cd623b
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
+import org.apache.geode.pdx.internal.PdxType;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class SqlHandlerTest {
+  private static final String REGION_NAME = "testRegion";
+  private static final String TABLE_NAME = "testTable";
+  private static final Object COLUMN_VALUE_1 = new Object();
+  private static final String COLUMN_NAME_1 = "columnName1";
+  private static final Object COLUMN_VALUE_2 = new Object();
+  private static final String COLUMN_NAME_2 = "columnName2";
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private ConnectionManager manager;
+  private Region region;
+  private InternalCache cache;
+  private SqlHandler handler;
+  private PreparedStatement statement;
+  private RegionMapping regionMapping;
+  private PdxInstanceImpl value;
+
+  @Before
+  public void setup() throws Exception {
+    manager = mock(ConnectionManager.class);
+    region = mock(Region.class);
+    cache = mock(InternalCache.class);
+    when(region.getRegionService()).thenReturn(cache);
+    handler = new SqlHandler(manager);
+    value = mock(PdxInstanceImpl.class);
+    when(value.getPdxType()).thenReturn(mock(PdxType.class));
+    setupManagerMock();
+  }
+
+  @Test
+  public void readReturnsNullIfNoKeyProvided() {
+    thrown.expect(IllegalArgumentException.class);
+    handler.read(region, null);
+  }
+
+  @Test
+  public void usesPdxFactoryForClassWhenExists() throws Exception {
+    setupEmptyResultSet();
+    String pdxClassName = "classname";
+    when(regionMapping.getPdxClassName()).thenReturn(pdxClassName);
+    handler.read(region, new Object());
+
+    verify(cache).createPdxInstanceFactory(pdxClassName);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void readClearsPreparedStatementWhenFinished() throws Exception {
+    setupEmptyResultSet();
+    handler.read(region, new Object());
+    verify(statement).clearParameters();
+  }
+
+  @Test
+  public void usesPbxFactoryForNoPbxClassWhenClassNonExistent() throws Exception {
+    setupEmptyResultSet();
+    handler.read(region, new Object());
+
+    verify(cache).createPdxInstanceFactory("no class", false);
+    verifyNoMoreInteractions(cache);
+  }
+
+  @Test
+  public void readReturnsNullIfNoResultsReturned() throws Exception {
+    setupEmptyResultSet();
+    assertThat(handler.read(region, new Object())).isNull();
+  }
+
+  @Test
+  public void throwsExceptionIfQueryFails() throws Exception {
+    when(statement.executeQuery()).thenThrow(SQLException.class);
+
+    thrown.expect(IllegalStateException.class);
+    handler.read(region, new Object());
+  }
+
+  @Test
+  public void readReturnsDataFromAllResultColumns() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn("key");
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+
+    String filedName1 = COLUMN_NAME_1.toLowerCase();
+    String filedName2 = COLUMN_NAME_2.toLowerCase();
+    handler.read(region, new Object());
+    verify(factory).writeField(filedName1, COLUMN_VALUE_1, Object.class);
+    verify(factory).writeField(filedName2, COLUMN_VALUE_2, Object.class);
+    verify(factory).create();
+  }
+
+  @Test
+  public void readResultOmitsKeyColumnIfNotInValue() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn(COLUMN_NAME_1);
+    PdxInstanceFactory factory = mock(PdxInstanceFactory.class);
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean())).thenReturn(factory);
+
+    String filedName2 = COLUMN_NAME_2.toLowerCase();
+    handler.read(region, new Object());
+    verify(factory).writeField(filedName2, COLUMN_VALUE_2, Object.class);
+    verify(factory, times(1)).writeField(any(), any(), any());
+    verify(factory).create();
+  }
+
+  @Test
+  public void throwsExceptionIfMoreThatOneResultReturned() throws Exception {
+    ResultSet result = mock(ResultSet.class);
+    setupResultSet(result);
+    when(result.next()).thenReturn(true);
+    when(result.getStatement()).thenReturn(mock(PreparedStatement.class));
+    when(statement.executeQuery()).thenReturn(result);
+
+    when(manager.getKeyColumnName(any(), anyString())).thenReturn("key");
+    when(cache.createPdxInstanceFactory(anyString(), anyBoolean()))
+        .thenReturn(mock(PdxInstanceFactory.class));
+
+    thrown.expect(IllegalStateException.class);
+    handler.read(region, new Object());
+  }
+
+  @Test
+  public void writeThrowsExceptionIfValueIsNullAndNotDoingDestroy() {
+    thrown.expect(IllegalArgumentException.class);
+    handler.write(region, Operation.UPDATE, new Object(), null);
+  }
+
+  @Test
+  public void insertActionSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement).setObject(2, COLUMN_VALUE_2);
+  }
+
+  @Test
+  public void updateActionSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement).setObject(2, COLUMN_VALUE_2);
+  }
+
+  @Test
+  public void destroyActionSucceeds() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement, times(1)).setObject(anyInt(), any());
+  }
+
+  @Test
+  public void destroyActionThatRemovesNoRowCompletesUnexceptionally() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenReturn(0);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+    verify(statement).setObject(1, COLUMN_VALUE_1);
+    verify(statement, times(1)).setObject(anyInt(), any());
+  }
+
+  @Test
+  public void destroyThrowExceptionWhenFail() throws Exception {
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.DESTROY, new Object(), value);
+  }
+
+  @Test
+  public void preparedStatementClearedAfterExecution() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).clearParameters();
+  }
+
+  @Test
+  public void whenInsertFailsUpdateSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(0);
+
+    PreparedStatement updateStatement = mock(PreparedStatement.class);
+    when(updateStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(updateStatement);
+
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(updateStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(updateStatement).clearParameters();
+  }
+
+  @Test
+  public void whenUpdateFailsInsertSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenReturn(0);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(insertStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenInsertFailsWithExceptionUpdateSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement updateStatement = mock(PreparedStatement.class);
+    when(updateStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(updateStatement);
+
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(updateStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(updateStatement).clearParameters();
+  }
+
+  @Test
+  public void whenUpdateFailsWithExceptionInsertSucceeds() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenReturn(1);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).executeUpdate();
+    verify(insertStatement).executeUpdate();
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenBothInsertAndUpdateFailExceptionIsThrown() throws Exception {
+    when(statement.executeUpdate()).thenThrow(SQLException.class);
+
+    PreparedStatement insertStatement = mock(PreparedStatement.class);
+    when(insertStatement.executeUpdate()).thenThrow(SQLException.class);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement)
+        .thenReturn(insertStatement);
+
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.UPDATE, new Object(), value);
+    verify(statement).clearParameters();
+    verify(insertStatement).clearParameters();
+  }
+
+  @Test
+  public void whenStatementUpdatesMultipleRowsExceptionThrown() throws Exception {
+    when(statement.executeUpdate()).thenReturn(2);
+    thrown.expect(IllegalStateException.class);
+    handler.write(region, Operation.CREATE, new Object(), value);
+    verify(statement).clearParameters();
+  }
+
+  private void setupManagerMock() throws SQLException {
+    ConnectionConfiguration connectionConfig = mock(ConnectionConfiguration.class);
+    when(manager.getConnectionConfig(any())).thenReturn(connectionConfig);
+
+    regionMapping = mock(RegionMapping.class);
+    when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
+    when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
+    when(manager.getMappingForRegion(any())).thenReturn(regionMapping);
+
+    List<ColumnValue> columnList = new ArrayList<>();
+    columnList.add(new ColumnValue(true, COLUMN_NAME_1, COLUMN_VALUE_1));
+    columnList.add(new ColumnValue(true, COLUMN_NAME_2, COLUMN_VALUE_2));
+    when(manager.getColumnToValueList(any(), any(), any(), any(), any())).thenReturn(columnList);
+
+    statement = mock(PreparedStatement.class);
+    when(manager.getPreparedStatement(any(), any(), any(), any(), anyInt())).thenReturn(statement);
+  }
+
+  private void setupResultSet(ResultSet result) throws SQLException {
+    ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+    when(result.getMetaData()).thenReturn(metaData);
+    when(metaData.getColumnCount()).thenReturn(2);
+
+    when(result.getObject(1)).thenReturn(COLUMN_VALUE_1);
+    when(metaData.getColumnName(1)).thenReturn(COLUMN_NAME_1);
+
+    when(result.getObject(2)).thenReturn(COLUMN_VALUE_2);
+    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
+  }
+
+  private void setupEmptyResultSet() throws SQLException {
+    ResultSet result = mock(ResultSet.class);
+    when(result.next()).thenReturn(false);
+    when(statement.executeQuery()).thenReturn(result);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
new file mode 100644
index 0000000..abfaf66
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class SqlStatementFactoryTest {
+  private static final String TABLE_NAME = "testTable";
+  private static final String KEY_COLUMN_NAME = "column1";
+
+  private List<ColumnValue> columnValues = new ArrayList<>();
+  private SqlStatementFactory factory = new SqlStatementFactory();
+
+  @Before
+  public void setup() {
+    columnValues.add(new ColumnValue(false, "column0", null));
+    columnValues.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    columnValues.add(new ColumnValue(false, "column2", null));
+  }
+
+  @Test
+  public void getSelectQueryString() throws Exception {
+    List<ColumnValue> keyColumn = new ArrayList<>();
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    String statement = factory.createSelectQueryString(TABLE_NAME, keyColumn);
+    String expectedStatement =
+        String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getDestroySqlString() throws Exception {
+    List<ColumnValue> keyColumn = new ArrayList<>();
+    keyColumn.add(new ColumnValue(true, KEY_COLUMN_NAME, null));
+    String statement = factory.createDestroySqlString(TABLE_NAME, keyColumn);
+    String expectedStatement =
+        String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getUpdateSqlString() throws Exception {
+    String statement = factory.createUpdateSqlString(TABLE_NAME, columnValues);
+    String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
+        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(2).getColumnName(),
+        KEY_COLUMN_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getInsertSqlString() throws Exception {
+    String statement = factory.createInsertSqlString(TABLE_NAME, columnValues);
+    String expectedStatement = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?,?,?)",
+        TABLE_NAME, columnValues.get(0).getColumnName(), columnValues.get(1).getColumnName(),
+        columnValues.get(2).getColumnName());
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
new file mode 100644
index 0000000..b387d72
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.extension.ExtensionPoint;
+
+/**
+ * Generates fake JdbcConnectorService with Connections and RegionMappings for tests.
+ */
+public class TestConfigService {
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String REGION_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+  private static final String CONNECTION_CONFIG_NAME = "testConnectionConfig";
+
+  public static JdbcConnectorService getTestConfigService() {
+    InternalCache cache = mock(InternalCache.class);
+    when(cache.getExtensionPoint()).thenReturn(mock(ExtensionPoint.class));
+
+    JdbcConnectorService service = new JdbcConnectorService();
+    service.init(cache);
+    service.addOrUpdateConnectionConfig(createConnectionConfig());
+    service.addOrUpdateRegionMapping(createRegionMapping());
+    return service;
+  }
+
+  private static RegionMapping createRegionMapping() {
+    return new RegionMapping(REGION_NAME, null, REGION_TABLE_NAME, CONNECTION_CONFIG_NAME, false,
+        Collections.emptyMap());
+  }
+
+  private static ConnectionConfiguration createConnectionConfig() {
+    return new ConnectionConfiguration(CONNECTION_CONFIG_NAME, CONNECTION_URL, null, null);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestableConnectionManager.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestableConnectionManager.java
new file mode 100644
index 0000000..01f73f8
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TestableConnectionManager.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+public class TestableConnectionManager extends ConnectionManager {
+
+  public TestableConnectionManager(InternalJdbcConnectorService configService) {
+    super(configService);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ConnectionConfigBuilderTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ConnectionConfigBuilderTest.java
new file mode 100644
index 0000000..e97ecc1
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ConnectionConfigBuilderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ConnectionConfigBuilderTest {
+  @Test
+  public void createsAllNullObjectIfNothingSet() {
+    ConnectionConfiguration config = new ConnectionConfigBuilder().build();
+    assertThat(config.getName()).isNull();
+    assertThat(config.getUrl()).isNull();
+    assertThat(config.getUser()).isNull();
+    assertThat(config.getPassword()).isNull();
+  }
+
+  @Test
+  public void createsObjectWithCorrectValues() {
+    ConnectionConfigBuilder builder = new ConnectionConfigBuilder();
+    builder.withName("name").withUrl("url").withUser("user").withPassword("password");
+    ConnectionConfiguration config = builder.build();
+    assertThat(config.getName()).isEqualTo("name");
+    assertThat(config.getUrl()).isEqualTo("url");
+    assertThat(config.getUser()).isEqualTo("user");
+    assertThat(config.getPassword()).isEqualTo("password");
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
new file mode 100644
index 0000000..8c290b7
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.apache.geode.connectors.jdbc.internal.xml.ElementType.CONNECTION;
+import static org.apache.geode.connectors.jdbc.internal.xml.ElementType.CONNECTION_SERVICE;
+import static org.apache.geode.connectors.jdbc.internal.xml.ElementType.FIELD_MAPPING;
+import static org.apache.geode.connectors.jdbc.internal.xml.ElementType.REGION_MAPPING;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.COLUMN_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.CONNECTION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.FIELD_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PDX_CLASS;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PRIMARY_KEY_IN_VALUE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.REGION;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.TABLE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Stack;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.xml.sax.Attributes;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheXmlException;
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.internal.cache.extension.ExtensionPoint;
+import org.apache.geode.internal.cache.xmlcache.CacheCreation;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ElementTypeTest {
+  private Stack<Object> stack = new Stack<>();
+  private Attributes attributes;
+  private CacheCreation cacheCreation;
+  private ExtensionPoint<Cache> extensionPoint;
+
+  @Before
+  public void setup() {
+    attributes = mock(Attributes.class);
+    cacheCreation = mock(CacheCreation.class);
+    extensionPoint = mock(ExtensionPoint.class);
+    when(cacheCreation.getExtensionPoint()).thenReturn(extensionPoint);
+  }
+
+  @Test
+  public void gettingElementTypeByNameReturnsCorrectType() {
+    assertThat(ElementType.getTypeFromName(CONNECTION_SERVICE.getTypeName()))
+        .isSameAs(CONNECTION_SERVICE);
+    assertThat(ElementType.getTypeFromName(CONNECTION.getTypeName())).isSameAs(CONNECTION);
+    assertThat(ElementType.getTypeFromName(REGION_MAPPING.getTypeName())).isSameAs(REGION_MAPPING);
+    assertThat(ElementType.getTypeFromName(FIELD_MAPPING.getTypeName())).isSameAs(FIELD_MAPPING);
+  }
+
+  @Test
+  public void gettingElementTypeThatDoesNotExistThrowsException() {
+    assertThatThrownBy(() -> ElementType.getTypeFromName("non-existant element"))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void startElementConnectionServiceThrowsWithoutCacheCreation() {
+    stack.push(new Object());
+    assertThatThrownBy(() -> CONNECTION_SERVICE.startElement(stack, attributes))
+        .isInstanceOf(CacheXmlException.class);
+  }
+
+  @Test
+  public void startElementConnectionService() {
+    stack.push(cacheCreation);
+    CONNECTION_SERVICE.startElement(stack, attributes);
+    verify(extensionPoint, times(1)).addExtension(any(JdbcServiceConfiguration.class));
+    assertThat(stack.peek()).isInstanceOf(JdbcServiceConfiguration.class);
+  }
+
+  @Test
+  public void endElementConnectionService() {
+    stack.push(new Object());
+    CONNECTION_SERVICE.endElement(stack);
+    assertThat(stack).isEmpty();
+  }
+
+  @Test
+  public void startElementConnectionThrowsWithoutJdbcServiceConfiguration() {
+    stack.push(new Object());
+    assertThatThrownBy(() -> CONNECTION.startElement(stack, attributes))
+        .isInstanceOf(CacheXmlException.class);
+  }
+
+  @Test
+  public void startElementConnection() {
+    JdbcServiceConfiguration serviceConfiguration = mock(JdbcServiceConfiguration.class);
+    stack.push(serviceConfiguration);
+
+    when(attributes.getValue(JdbcConnectorServiceXmlParser.NAME)).thenReturn("connectionName");
+    when(attributes.getValue(JdbcConnectorServiceXmlParser.URL)).thenReturn("url");
+    when(attributes.getValue(JdbcConnectorServiceXmlParser.USER)).thenReturn("username");
+    when(attributes.getValue(JdbcConnectorServiceXmlParser.PASSWORD)).thenReturn("secret");
+
+    CONNECTION.startElement(stack, attributes);
+    ConnectionConfiguration config = ((ConnectionConfigBuilder) stack.pop()).build();
+
+    assertThat(config.getName()).isEqualTo("connectionName");
+    assertThat(config.getUrl()).isEqualTo("url");
+    assertThat(config.getUser()).isEqualTo("username");
+    assertThat(config.getPassword()).isEqualTo("secret");
+  }
+
+  @Test
+  public void endElementConnection() {
+    ConnectionConfigBuilder builder = mock(ConnectionConfigBuilder.class);
+    JdbcServiceConfiguration serviceConfiguration = mock(JdbcServiceConfiguration.class);
+    stack.push(serviceConfiguration);
+    stack.push(builder);
+
+    CONNECTION.endElement(stack);
+
+    assertThat(stack.size()).isEqualTo(1);
+    verify(serviceConfiguration, times(1)).addConnectionConfig(any());
+  }
+
+  @Test
+  public void startElementRegionMappingThrowsWithoutJdbcServiceConfiguration() {
+    stack.push(new Object());
+    assertThatThrownBy(() -> REGION_MAPPING.startElement(stack, attributes))
+        .isInstanceOf(CacheXmlException.class);
+  }
+
+  @Test
+  public void startElementRegionMapping() {
+    JdbcServiceConfiguration serviceConfiguration = mock(JdbcServiceConfiguration.class);
+    stack.push(serviceConfiguration);
+
+    when(attributes.getValue(REGION)).thenReturn("region");
+    when(attributes.getValue(CONNECTION_NAME)).thenReturn("connectionName");
+    when(attributes.getValue(TABLE)).thenReturn("table");
+    when(attributes.getValue(PDX_CLASS)).thenReturn("pdxClass");
+    when(attributes.getValue(PRIMARY_KEY_IN_VALUE)).thenReturn("true");
+
+    ElementType.REGION_MAPPING.startElement(stack, attributes);
+
+    RegionMapping regionMapping = ((RegionMappingBuilder) stack.pop()).build();
+    assertThat(regionMapping.getRegionName()).isEqualTo("region");
+    assertThat(regionMapping.getConnectionConfigName()).isEqualTo("connectionName");
+    assertThat(regionMapping.getTableName()).isEqualTo("table");
+    assertThat(regionMapping.getPdxClassName()).isEqualTo("pdxClass");
+    assertThat(regionMapping.isPrimaryKeyInValue()).isEqualTo(true);
+  }
+
+  @Test
+  public void endElementRegionMapping() {
+    RegionMappingBuilder builder = mock(RegionMappingBuilder.class);
+    JdbcServiceConfiguration serviceConfiguration = mock(JdbcServiceConfiguration.class);
+    stack.push(serviceConfiguration);
+    stack.push(builder);
+
+    ElementType.REGION_MAPPING.endElement(stack);
+
+    assertThat(stack.size()).isEqualTo(1);
+    verify(serviceConfiguration, times(1)).addRegionMapping(any());
+  }
+
+  @Test
+  public void startElementFieldMappingThrowsWithoutRegionMappingBuilder() {
+    stack.push(new Object());
+    assertThatThrownBy(() -> FIELD_MAPPING.startElement(stack, attributes))
+        .isInstanceOf(CacheXmlException.class);
+  }
+
+  @Test
+  public void startElementFieldMapping() {
+    RegionMappingBuilder builder = new RegionMappingBuilder();
+    stack.push(builder);
+    when(attributes.getValue(FIELD_NAME)).thenReturn("fieldName");
+    when(attributes.getValue(COLUMN_NAME)).thenReturn("columnName");
+
+    ElementType.FIELD_MAPPING.startElement(stack, attributes);
+    RegionMapping regionMapping = ((RegionMappingBuilder) stack.pop()).build();
+
+    assertThat(regionMapping.getColumnNameForField("fieldName")).isEqualTo("columnName");
+  }
+
+  @Test
+  public void endElementFieldMapping() {
+    RegionMappingBuilder builder = mock(RegionMappingBuilder.class);
+    JdbcServiceConfiguration serviceConfiguration = mock(JdbcServiceConfiguration.class);
+    stack.push(serviceConfiguration);
+    stack.push(builder);
+
+    ElementType.FIELD_MAPPING.endElement(stack);
+
+    assertThat(stack.size()).isEqualTo(2);
+    verifyZeroInteractions(builder);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGeneratorIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGeneratorIntegrationTest.java
new file mode 100644
index 0000000..5b6d260
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGeneratorIntegrationTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlGenerator.PREFIX;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.COLUMN_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.CONNECTION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.FIELD_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.NAMESPACE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PASSWORD;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PDX_CLASS;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PRIMARY_KEY_IN_VALUE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.REGION;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.TABLE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.URL;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.USER;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class JdbcConnectorServiceXmlGeneratorIntegrationTest {
+
+  private InternalCache cache;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setup() {
+    cache = (InternalCache) new CacheFactory().create();
+  }
+
+  @After
+  public void tearDown() {
+    cache.close();
+  }
+
+  @Test
+  public void cacheGetServiceReturnsJdbcConnectorService() {
+    assertThat(cache.getService(InternalJdbcConnectorService.class)).isNotNull();
+  }
+
+  @Test
+  public void serviceWithoutInformationDoesNotPersist() throws Exception {
+    cache.getService(InternalJdbcConnectorService.class);
+    generateXml();
+    Document document = getCacheXmlDocument();
+    NodeList elements = getElementsByName(document, ElementType.CONNECTION_SERVICE);
+    assertThat(elements.getLength()).isZero();
+  }
+
+  @Test
+  public void serviceWithConnectionsHasCorrectXml() throws Exception {
+    InternalJdbcConnectorService service = cache.getService(InternalJdbcConnectorService.class);
+    ConnectionConfiguration config = new ConnectionConfigBuilder().withName("name").withUrl("url")
+        .withUser("username").withPassword("secret").build();
+    service.addOrUpdateConnectionConfig(config);
+
+    generateXml();
+
+    Document document = getCacheXmlDocument();
+    NodeList serviceElements = getElementsByName(document, ElementType.CONNECTION_SERVICE);
+    assertThat(serviceElements.getLength()).isEqualTo(1);
+
+    Element serviceElement = (Element) serviceElements.item(0);
+    assertThat(serviceElement.getAttribute("xmlns:" + PREFIX)).isEqualTo(NAMESPACE);
+
+    NodeList connectionElements = getElementsByName(document, ElementType.CONNECTION);
+    assertThat(connectionElements.getLength()).isEqualTo(1);
+
+    Element connectionElement = (Element) connectionElements.item(0);
+    assertThat(connectionElement.getAttribute(NAME)).isEqualTo("name");
+    assertThat(connectionElement.getAttribute(URL)).isEqualTo("url");
+    assertThat(connectionElement.getAttribute(USER)).isEqualTo("username");
+    assertThat(connectionElement.getAttribute(PASSWORD)).isEqualTo("secret");
+  }
+
+  @Test
+  public void generatesXmlContainingRegionMapping() throws Exception {
+    InternalJdbcConnectorService service = cache.getService(InternalJdbcConnectorService.class);
+    RegionMappingBuilder regionMappingBuilder = new RegionMappingBuilder()
+        .withRegionName("regionName").withPdxClassName("pdxClassName").withTableName("tableName")
+        .withConnectionConfigName("connectionConfigName").withPrimaryKeyInValue("true");
+    regionMappingBuilder.withFieldToColumnMapping("fieldName1", "columnMapping1");
+    regionMappingBuilder.withFieldToColumnMapping("fieldName2", "columnMapping2");
+    RegionMapping regionMapping = regionMappingBuilder.build();
+    service.addOrUpdateRegionMapping(regionMapping);
+
+    generateXml();
+
+    Document document = getCacheXmlDocument();
+    NodeList serviceElements = getElementsByName(document, ElementType.CONNECTION_SERVICE);
+    assertThat(serviceElements.getLength()).isEqualTo(1);
+
+    NodeList mappingElements = getElementsByName(document, ElementType.REGION_MAPPING);
+    assertThat(mappingElements.getLength()).isEqualTo(1);
+
+    Element mappingElement = (Element) mappingElements.item(0);
+    assertThat(mappingElement.getAttribute(REGION)).isEqualTo("regionName");
+    assertThat(mappingElement.getAttribute(PDX_CLASS)).isEqualTo("pdxClassName");
+    assertThat(mappingElement.getAttribute(TABLE)).isEqualTo("tableName");
+    assertThat(mappingElement.getAttribute(CONNECTION_NAME)).isEqualTo("connectionConfigName");
+    assertThat(mappingElement.getAttribute(PRIMARY_KEY_IN_VALUE)).isEqualTo("true");
+
+    NodeList fieldMappingElements = getElementsByName(mappingElement, ElementType.FIELD_MAPPING);
+    assertThat(fieldMappingElements.getLength()).isEqualTo(2);
+    validatePresenceOfFieldMapping(fieldMappingElements, "fieldName1", "columnMapping1");
+    validatePresenceOfFieldMapping(fieldMappingElements, "fieldName2", "columnMapping2");
+  }
+
+  private void validatePresenceOfFieldMapping(NodeList elements, String fieldName,
+      String columnName) {
+    for (int i = 0; i < elements.getLength(); i++) {
+      Element fieldMapping = (Element) elements.item(i);
+      if (fieldMapping.getAttribute(FIELD_NAME).equals(fieldName)) {
+        assertThat(fieldMapping.getAttribute(COLUMN_NAME)).isEqualTo(columnName);
+        return;
+      }
+    }
+    fail("Field name '" + fieldName + "' did not match those provided");
+  }
+
+  private NodeList getElementsByName(Document document, ElementType elementType) {
+    String name = getTagName(elementType);
+    return document.getElementsByTagName(name);
+  }
+
+  private NodeList getElementsByName(Element element, ElementType elementType) {
+    String name = getTagName(elementType);
+    return element.getElementsByTagName(name);
+  }
+
+  private String getTagName(ElementType elementType) {
+    return PREFIX + ":" + elementType.getTypeName();
+  }
+
+  private Document getCacheXmlDocument()
+      throws IOException, SAXException, ParserConfigurationException {
+    File cacheXml = new File(temporaryFolder.getRoot(), "cache.xml");
+    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+    DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+    Document document = dBuilder.parse(cacheXml);
+    document.getDocumentElement().normalize();
+    return document;
+  }
+
+  private void generateXml() throws IOException {
+    File cacheXml = new File(temporaryFolder.getRoot(), "cache.xml");
+    PrintWriter printWriter = new PrintWriter(new FileWriter(cacheXml));
+    CacheXmlGenerator.generate(cache, printWriter, true, false, false);
+    printWriter.flush();
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGeneratorTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGeneratorTest.java
new file mode 100644
index 0000000..b91bd91
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlGeneratorTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.NAMESPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JdbcConnectorServiceXmlGeneratorTest {
+
+  @Test
+  public void returnsCorrectNamespace() {
+    JdbcConnectorServiceXmlGenerator generator = new JdbcConnectorServiceXmlGenerator(null, null);
+    assertThat(generator.getNamespaceUri()).isEqualTo(NAMESPACE);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlIntegrationTest.java
new file mode 100644
index 0000000..ac7fcf5
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlIntegrationTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.JdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class JdbcConnectorServiceXmlIntegrationTest {
+
+  private InternalCache cache;
+  private File cacheXml;
+  private ConnectionConfiguration config1;
+  private ConnectionConfiguration config2;
+  private RegionMapping regionMapping1;
+  private RegionMapping regionMapping2;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws Exception {
+    cache = (InternalCache) new CacheFactory().create();
+    configureService();
+    cacheXml = generateXml();
+    cache.close();
+  }
+
+  @After
+  public void tearDown() {
+    cache.close();
+  }
+
+  @Test
+  public void canRecreateJdbcConnectorServiceFromXml() throws Exception {
+    cache =
+        (InternalCache) new CacheFactory().set(CACHE_XML_FILE, cacheXml.getAbsolutePath()).create();
+
+    JdbcConnectorService service =
+        (JdbcConnectorService) cache.getExtensionPoint().getExtensions().iterator().next();
+    assertThat(service.getConnectionConfig(config1.getName())).isEqualTo(config1);
+    assertThat(service.getConnectionConfig(config2.getName())).isEqualTo(config2);
+    assertThat(service.getMappingForRegion(regionMapping1.getRegionName()))
+        .isEqualTo(regionMapping1);
+    assertThat(service.getMappingForRegion(regionMapping2.getRegionName()))
+        .isEqualTo(regionMapping2);
+  }
+
+  private void configureService() {
+    InternalJdbcConnectorService service = cache.getService(InternalJdbcConnectorService.class);
+    config1 = new ConnectionConfigBuilder().withName("connection1").withUrl("url1")
+        .withUser("username1").withPassword("secret1").build();
+    config2 = new ConnectionConfigBuilder().withName("connection2").withUrl("url2")
+        .withUser("username2").withPassword("secret2").build();
+    service.addOrUpdateConnectionConfig(config1);
+    service.addOrUpdateConnectionConfig(config2);
+
+    RegionMappingBuilder regionMappingBuilder1 = new RegionMappingBuilder()
+        .withRegionName("regionName1").withPdxClassName("pdxClassName1").withTableName("tableName1")
+        .withConnectionConfigName("connection1").withPrimaryKeyInValue("true");
+    regionMappingBuilder1.withFieldToColumnMapping("fieldName1", "columnMapping1");
+    regionMappingBuilder1.withFieldToColumnMapping("fieldName2", "columnMapping2");
+    regionMapping1 = regionMappingBuilder1.build();
+
+    RegionMappingBuilder regionMappingBuilder2 = new RegionMappingBuilder()
+        .withRegionName("regionName2").withPdxClassName("pdxClassName2").withTableName("tableName2")
+        .withConnectionConfigName("connection2").withPrimaryKeyInValue("false");
+    regionMappingBuilder1.withFieldToColumnMapping("fieldName3", "columnMapping3");
+    regionMappingBuilder1.withFieldToColumnMapping("fieldName4", "columnMapping4");
+    regionMapping2 = regionMappingBuilder2.build();
+
+    service.addOrUpdateRegionMapping(regionMapping1);
+    service.addOrUpdateRegionMapping(regionMapping2);
+  }
+
+  private File generateXml() throws IOException {
+    File cacheXml = new File(temporaryFolder.getRoot(), "cache.xml");
+    PrintWriter printWriter = new PrintWriter(new FileWriter(cacheXml));
+    CacheXmlGenerator.generate(cache, printWriter, true, false, false);
+    printWriter.flush();
+    return cacheXml;
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParserTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParserTest.java
new file mode 100644
index 0000000..95a4d45
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParserTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.apache.geode.connectors.jdbc.internal.xml.ElementType.CONNECTION_SERVICE;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.NAMESPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Stack;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.xml.sax.Attributes;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.cache.extension.ExtensionPoint;
+import org.apache.geode.internal.cache.xmlcache.CacheCreation;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JdbcConnectorServiceXmlParserTest {
+
+  private Stack<Object> stack = new Stack<>();
+  private Attributes attributes;
+  private CacheCreation cacheCreation;
+  private ExtensionPoint<Cache> extensionPoint;
+
+  @Before
+  public void setup() {
+    attributes = mock(Attributes.class);
+    cacheCreation = mock(CacheCreation.class);
+    extensionPoint = mock(ExtensionPoint.class);
+    when(cacheCreation.getExtensionPoint()).thenReturn(extensionPoint);
+  }
+
+  @Test
+  public void getNamespaceUriReturnsNamespace() {
+    assertThat(new JdbcConnectorServiceXmlParser().getNamespaceUri()).isEqualTo(NAMESPACE);
+  }
+
+  @Test
+  public void startElementCreatesJdbcServiceConfiguration() throws Exception {
+    JdbcConnectorServiceXmlParser parser = new JdbcConnectorServiceXmlParser();
+    stack.push(cacheCreation);
+    parser.setStack(stack);
+
+    parser.startElement(NAMESPACE, CONNECTION_SERVICE.getTypeName(), null, attributes);
+
+    assertThat(stack.pop()).isInstanceOf(JdbcServiceConfiguration.class);
+  }
+
+  @Test
+  public void startElementWithWrongUriDoesNothing() throws Exception {
+    JdbcConnectorServiceXmlParser parser = new JdbcConnectorServiceXmlParser();
+    stack.push(cacheCreation);
+    parser.setStack(stack);
+
+    parser.startElement("wrongNamespace", CONNECTION_SERVICE.getTypeName(), null, attributes);
+
+    assertThat(stack.pop()).isEqualTo(cacheCreation);
+  }
+
+  @Test
+  public void endElementRemovesJdbcServiceConfiguration() throws Exception {
+    JdbcConnectorServiceXmlParser parser = new JdbcConnectorServiceXmlParser();
+    stack.push(cacheCreation);
+    JdbcServiceConfiguration serviceConfiguration = mock(JdbcServiceConfiguration.class);
+    stack.push(serviceConfiguration);
+    parser.setStack(stack);
+
+    parser.endElement(NAMESPACE, CONNECTION_SERVICE.getTypeName(), null);
+
+    assertThat(stack.pop()).isEqualTo(cacheCreation);
+    verifyZeroInteractions(serviceConfiguration);
+  }
+
+  @Test
+  public void endElementRemovesWithWrongUriDoesNothing() throws Exception {
+    JdbcConnectorServiceXmlParser parser = new JdbcConnectorServiceXmlParser();
+    stack.push(cacheCreation);
+    JdbcServiceConfiguration serviceConfiguration = mock(JdbcServiceConfiguration.class);
+    stack.push(serviceConfiguration);
+    parser.setStack(stack);
+
+    parser.endElement("wrongNamespace", CONNECTION_SERVICE.getTypeName(), null);
+
+    assertThat(stack.pop()).isEqualTo(serviceConfiguration);
+    verifyZeroInteractions(serviceConfiguration);
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcServiceConfigurationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcServiceConfigurationTest.java
new file mode 100644
index 0000000..939db7c
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcServiceConfigurationTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.NAMESPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.connectors.jdbc.internal.ConnectionConfiguration;
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JdbcServiceConfigurationTest {
+
+  private JdbcServiceConfiguration configuration;
+
+  private InternalCache cache;
+  private InternalJdbcConnectorService service;
+  private ConnectionConfiguration connection1;
+  private ConnectionConfiguration connection2;
+  private RegionMapping mapping1;
+  private RegionMapping mapping2;
+
+  @Before
+  public void setUp() {
+    connection1 = mock(ConnectionConfiguration.class);
+    connection2 = mock(ConnectionConfiguration.class);
+    mapping1 = mock(RegionMapping.class);
+    mapping2 = mock(RegionMapping.class);
+
+    service = mock(InternalJdbcConnectorService.class);
+    cache = mock(InternalCache.class);
+    when(cache.getService(InternalJdbcConnectorService.class)).thenReturn(service);
+
+    configuration = new JdbcServiceConfiguration();
+  }
+
+  @Test
+  public void getXmlGeneratorReturnsJdbcConnectorServiceXmlGenerator() {
+    XmlGenerator<Cache> generator = configuration.getXmlGenerator();
+
+    assertThat(generator).isInstanceOf(JdbcConnectorServiceXmlGenerator.class);
+  }
+
+  @Test
+  public void getXmlGeneratorReturnsGeneratorWithJdbcConnectorNamespace() {
+    XmlGenerator<Cache> generator = configuration.getXmlGenerator();
+
+    assertThat(generator.getNamespaceUri()).isEqualTo(NAMESPACE);
+  }
+
+  @Test
+  public void getXmlGeneratorReturnsEmptyGeneratorByDefault() {
+    JdbcConnectorServiceXmlGenerator generator =
+        (JdbcConnectorServiceXmlGenerator) configuration.getXmlGenerator();
+
+    assertThat(generator.getConnections()).isEmpty();
+    assertThat(generator.getMappings()).isEmpty();
+  }
+
+  @Test
+  public void getXmlGeneratorWithConnections() {
+    configuration.addConnectionConfig(connection1);
+    configuration.addConnectionConfig(connection2);
+
+    JdbcConnectorServiceXmlGenerator generator =
+        (JdbcConnectorServiceXmlGenerator) configuration.getXmlGenerator();
+
+    assertThat(generator.getConnections()).containsExactly(connection1, connection2);
+  }
+
+  @Test
+  public void getXmlGeneratorWithRegionMappings() {
+    configuration.addRegionMapping(mapping1);
+    configuration.addRegionMapping(mapping2);
+
+    JdbcConnectorServiceXmlGenerator generator =
+        (JdbcConnectorServiceXmlGenerator) configuration.getXmlGenerator();
+
+    assertThat(generator.getMappings()).containsExactly(mapping1, mapping2);
+  }
+
+  @Test
+  public void onCreateWithNoConnectionsOrMappings() {
+    configuration.onCreate(cache, cache);
+    verifyZeroInteractions(service);
+  }
+
+  @Test
+  public void onCreateWithConnections() {
+    configuration.addConnectionConfig(connection1);
+    configuration.addConnectionConfig(connection2);
+
+    configuration.onCreate(cache, cache);
+
+    verify(service, times(1)).addOrUpdateConnectionConfig(connection1);
+    verify(service, times(1)).addOrUpdateConnectionConfig(connection2);
+  }
+
+  @Test
+  public void onCreateWithRegionMappings() {
+    configuration.addRegionMapping(mapping1);
+    configuration.addRegionMapping(mapping2);
+
+    configuration.onCreate(cache, cache);
+
+    verify(service, times(1)).addOrUpdateRegionMapping(mapping1);
+    verify(service, times(1)).addOrUpdateRegionMapping(mapping2);
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingBuilderTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingBuilderTest.java
new file mode 100644
index 0000000..9953d95
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingBuilderTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal.xml;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.connectors.jdbc.internal.RegionMapping;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class RegionMappingBuilderTest {
+
+  @Test
+  public void createsMappingWithDefaultValuesIfNonAreSet() {
+    RegionMapping regionMapping = new RegionMappingBuilder().build();
+
+    assertThat(regionMapping.getRegionName()).isNull();
+    assertThat(regionMapping.getTableName()).isNull();
+    assertThat(regionMapping.getPdxClassName()).isNull();
+    assertThat(regionMapping.getConnectionConfigName()).isNull();
+    assertThat(regionMapping.isPrimaryKeyInValue()).isFalse();
+  }
+
+  @Test
+  public void createsMappingWithSpecifiedValues() {
+    RegionMappingBuilder builder = new RegionMappingBuilder();
+    RegionMapping regionMapping = builder.withTableName("tableName").withRegionName("regionName")
+        .withPrimaryKeyInValue("true").withPdxClassName("pdxClassName")
+        .withConnectionConfigName("configName").withFieldToColumnMapping("fieldName", "columnName")
+        .build();
+
+    assertThat(regionMapping.getRegionName()).isEqualTo("regionName");
+    assertThat(regionMapping.getTableName()).isEqualTo("tableName");
+    assertThat(regionMapping.getPdxClassName()).isEqualTo("pdxClassName");
+    assertThat(regionMapping.getConnectionConfigName()).isEqualTo("configName");
+    assertThat(regionMapping.isPrimaryKeyInValue()).isTrue();
+    assertThat(regionMapping.getColumnNameForField("fieldName")).isEqualTo("columnName");
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
index cc4d3d1..f5b9266 100644
--- a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
+++ b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
@@ -35,6 +35,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.cache.CachePerfStats;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.pdx.PdxInstanceFactory;
 
 /**
  * Factory methods for fake objects for use in test.
@@ -62,6 +63,7 @@ public class Fakes {
     InternalDistributedSystem system = mock(InternalDistributedSystem.class);
     DistributionConfig config = mock(DistributionConfig.class);
     DistributionManager distributionManager = mock(DistributionManager.class);
+    PdxInstanceFactory pdxInstanceFactory = mock(PdxInstanceFactory.class);
     CancelCriterion systemCancelCriterion = mock(CancelCriterion.class);
     DSClock clock = mock(DSClock.class);
     LogWriter logger = mock(LogWriter.class);
@@ -81,6 +83,7 @@ public class Fakes {
     when(cache.getCancelCriterion()).thenReturn(systemCancelCriterion);
     when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
     when(cache.getSecurityService()).thenReturn(mock(SecurityService.class));
+    when(cache.createPdxInstanceFactory(any())).thenReturn(pdxInstanceFactory);
 
     when(system.getDistributedMember()).thenReturn(member);
     when(system.getConfig()).thenReturn(config);
@@ -122,7 +125,7 @@ public class Fakes {
     when(region.getCache()).thenReturn(cache);
     when(region.getRegionService()).thenReturn(cache);
     when(region.getName()).thenReturn(name);
-
+    when(region.getFullPath()).thenReturn("/" + name);
     return region;
   }
 
diff --git a/settings.gradle b/settings.gradle
index 13a665c..a84f352 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -30,6 +30,7 @@ include 'geode-lucene'
 include 'geode-old-client-support'
 include 'geode-wan'
 include 'geode-cq'
+include 'geode-connectors'
 include 'geode-benchmarks'
 include 'geode-client-protocol'
 include 'extensions/geode-modules'

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.