You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/12/19 17:44:20 UTC

[geode] branch develop updated: GEODE-6194: add composite keys to jdbc (#3015)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new bd9c284  GEODE-6194: add composite keys to jdbc (#3015)
bd9c284 is described below

commit bd9c284975bfb0b8caef15ab6205af972d76cf41
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed Dec 19 09:44:10 2018 -0800

    GEODE-6194: add composite keys to jdbc (#3015)
    
    Multiple --id column names can now be specified on "create jdbc-mapping"
    using a comma separated list.
    The key object itself must then be a JSON string whose names are
    the corresponding pdx field name for each id column name.
    The extension team is looking into changing composite keys for both GGC and JDBC
    to something other than JSON.
---
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       | 116 +++++++++-
 .../geode/connectors/jdbc/JdbcDistributedTest.java |  41 +++-
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |  35 ++-
 .../connectors/jdbc/JdbcWriterIntegrationTest.java | 103 ++++++++-
 .../TableMetaDataManagerIntegrationTest.java       |  14 +-
 .../jdbc/internal/TestConfigService.java           |  17 +-
 .../connectors/jdbc/internal/EntryColumnData.java  |   9 +-
 .../geode/connectors/jdbc/internal/SqlHandler.java |  89 ++++++--
 .../jdbc/internal/SqlStatementFactory.java         |  70 ++++--
 .../connectors/jdbc/internal/TableMetaData.java    |  11 +-
 .../jdbc/internal/TableMetaDataManager.java        |  49 ++--
 .../jdbc/internal/TableMetaDataView.java           |   4 +-
 .../jdbc/internal/cli/CreateMappingCommand.java    |   2 +-
 .../connectors/jdbc/internal/SqlHandlerTest.java   | 253 +++++++++++++++++++--
 .../jdbc/internal/SqlStatementFactoryTest.java     |  81 ++++++-
 .../jdbc/internal/SqlToPdxInstanceCreatorTest.java |   5 +-
 .../jdbc/internal/TableMetaDataManagerTest.java    |  76 +++++--
 17 files changed, 811 insertions(+), 164 deletions(-)

diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 42a9369..b7d6d38 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import org.awaitility.core.ThrowingRunnable;
+import org.json.JSONObject;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,6 +38,7 @@ import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.WritablePdxInstance;
 
 public abstract class JdbcAsyncWriterIntegrationTest {
 
@@ -59,19 +61,22 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   public void setup() throws Exception {
     cache = (InternalCache) new CacheFactory().set("locators", "").set("mcast-port", "0")
         .setPdxReadSerialized(false).create();
-    employees = createRegionWithJDBCAsyncWriter(REGION_TABLE_NAME);
     connection = getConnection();
     statement = connection.createStatement();
     statement.execute("Create Table " + REGION_TABLE_NAME
         + " (id varchar(10) primary key not null, name varchar(10), age int)");
     pdxEmployee1 = cache.createPdxInstanceFactory(Employee.class.getName())
-        .writeString("name", "Emp1").writeInt("age", 55).create();
+        .writeString("id", "1").writeString("name", "Emp1").writeInt("age", 55).create();
     pdxEmployee2 = cache.createPdxInstanceFactory(Employee.class.getName())
-        .writeString("name", "Emp2").writeInt("age", 21).create();
+        .writeString("id", "2").writeString("name", "Emp2").writeInt("age", 21).create();
     employee1 = (Employee) pdxEmployee1.getObject();
     employee2 = (Employee) pdxEmployee2.getObject();
   }
 
+  private void setupRegion(String ids) throws RegionMappingExistsException {
+    employees = createRegionWithJDBCAsyncWriter(REGION_TABLE_NAME, ids);
+  }
+
   @After
   public void tearDown() throws Exception {
     cache.close();
@@ -97,7 +102,8 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   public abstract String getConnectionUrl();
 
   @Test
-  public void validateJDBCAsyncWriterTotalEvents() {
+  public void validateJDBCAsyncWriterTotalEvents() throws RegionMappingExistsException {
+    setupRegion(null);
     employees.put("1", pdxEmployee1);
     employees.put("2", pdxEmployee2);
 
@@ -106,6 +112,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
+    setupRegion(null);
     PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
         .writeObject("age", 55).writeInt("id", 3).create();
     employees.put("1", pdx1);
@@ -119,7 +126,8 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   }
 
   @Test
-  public void putNonPdxInstanceFails() {
+  public void putNonPdxInstanceFails() throws RegionMappingExistsException {
+    setupRegion(null);
     Region nonPdxEmployees = this.employees;
     nonPdxEmployees.put("1", "non pdx instance");
 
@@ -129,7 +137,9 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   }
 
   @Test
-  public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
+  public void putNonPdxInstanceThatIsPdxSerializable()
+      throws SQLException, RegionMappingExistsException {
+    setupRegion(null);
     Region nonPdxEmployees = this.employees;
     Employee value = new Employee("2", "Emp2", 22);
     nonPdxEmployees.put("2", value);
@@ -144,6 +154,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void canDestroyFromTable() throws Exception {
+    setupRegion(null);
     employees.put("1", pdxEmployee1);
     employees.put("2", pdxEmployee2);
 
@@ -160,7 +171,66 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   }
 
   @Test
+  public void canDestroyFromTableWithCompositeKey() throws Exception {
+    setupRegion("id,age");
+    JSONObject compositeKey1 = new JSONObject();
+    compositeKey1.put("id", pdxEmployee1.getField("id"));
+    compositeKey1.put("age", pdxEmployee1.getField("age"));
+    JSONObject compositeKey2 = new JSONObject();
+    compositeKey2.put("id", pdxEmployee2.getField("id"));
+    compositeKey2.put("age", pdxEmployee2.getField("age"));
+    employees.put(compositeKey1.toString(), pdxEmployee1);
+    employees.put(compositeKey2.toString(), pdxEmployee2);
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    employees.destroy(compositeKey1.toString());
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(3));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canInsertIntoTable() throws Exception {
+    setupRegion(null);
+    employees.put("1", pdxEmployee1);
+    employees.put("2", pdxEmployee2);
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canInsertIntoTableWithCompositeKey() throws Exception {
+    setupRegion("id,age");
+    JSONObject compositeKey1 = new JSONObject();
+    compositeKey1.put("id", pdxEmployee1.getField("id"));
+    compositeKey1.put("age", pdxEmployee1.getField("age"));
+    String actualKey = compositeKey1.toString();
+    JSONObject compositeKey2 = new JSONObject();
+    compositeKey2.put("id", pdxEmployee2.getField("id"));
+    compositeKey2.put("age", pdxEmployee2.getField("age"));
+
+    employees.put(actualKey, pdxEmployee1);
+    employees.put(compositeKey2.toString(), pdxEmployee2);
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
   public void canUpdateTable() throws Exception {
+    setupRegion(null);
     employees.put("1", pdxEmployee1);
 
     awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
@@ -176,7 +246,32 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   }
 
   @Test
+  public void canUpdateTableWithCompositeKey() throws Exception {
+    setupRegion("id,age");
+    PdxInstance myPdx = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("id", "1").writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    JSONObject compositeKey1 = new JSONObject();
+    compositeKey1.put("id", myPdx.getField("id"));
+    compositeKey1.put("age", myPdx.getField("age"));
+    employees.put(compositeKey1.toString(), myPdx);
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+    WritablePdxInstance updatedPdx = myPdx.createWriter();
+    updatedPdx.setField("name", "updated");
+    Employee updatedEmployee = (Employee) updatedPdx.getObject();
+
+    employees.put(compositeKey1.toString(), updatedPdx);
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", updatedEmployee);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
   public void canUpdateBecomeInsert() throws Exception {
+    setupRegion(null);
     employees.put("1", pdxEmployee1);
 
     awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
@@ -196,6 +291,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void canInsertBecomeUpdate() throws Exception {
+    setupRegion(null);
     statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
     validateTableRowCount(1);
 
@@ -221,9 +317,9 @@ public abstract class JdbcAsyncWriterIntegrationTest {
     assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
   }
 
-  private Region<String, PdxInstance> createRegionWithJDBCAsyncWriter(String regionName)
+  private Region<String, PdxInstance> createRegionWithJDBCAsyncWriter(String regionName, String ids)
       throws RegionMappingExistsException {
-    jdbcWriter = new JdbcAsyncWriter(createSqlHandler(), cache);
+    jdbcWriter = new JdbcAsyncWriter(createSqlHandler(ids), cache);
     cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
         .create("jdbcAsyncQueue", jdbcWriter);
 
@@ -239,10 +335,10 @@ public abstract class JdbcAsyncWriterIntegrationTest {
     assertThat(size).isEqualTo(expected);
   }
 
-  private SqlHandler createSqlHandler()
+  private SqlHandler createSqlHandler(String ids)
       throws RegionMappingExistsException {
     return new SqlHandler(new TableMetaDataManager(),
-        TestConfigService.getTestConfigService(getConnectionUrl()),
+        TestConfigService.getTestConfigService(getConnectionUrl(), ids),
         testDataSourceFactory);
   }
 
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
index 94fa59d..06e1cd3 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
@@ -29,6 +29,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Date;
 
+import org.json.JSONObject;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -446,6 +447,36 @@ public abstract class JdbcDistributedTest implements Serializable {
   }
 
   @Test
+  public void getReadsFromDBWithCompositeKey() throws Exception {
+    createTable();
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true, "id,age");
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+      JSONObject compositeKey1 = new JSONObject();
+      compositeKey1.put("id", pdxEmployee1.getField("id"));
+      compositeKey1.put("age", pdxEmployee1.getField("age"));
+      String key = compositeKey1.toString();
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      region.put(key, pdxEmployee1);
+      region.invalidate(key);
+      JdbcWriter<Object, Object> writer =
+          (JdbcWriter<Object, Object>) region.getAttributes().getCacheWriter();
+      long writeCallsCompletedBeforeGet = writer.getTotalEvents();
+
+      Employee result = (Employee) region.get(key);
+
+      assertThat(result.getId()).isEqualTo("id1");
+      assertThat(result.getName()).isEqualTo("Emp1");
+      assertThat(result.getAge()).isEqualTo(55);
+      assertThat(writer.getTotalEvents()).isEqualTo(writeCallsCompletedBeforeGet);
+    });
+  }
+
+  @Test
   public void getReadsFromDBWithAsyncWriter() throws Exception {
     createTable();
     createRegionUsingGfsh();
@@ -640,15 +671,21 @@ public abstract class JdbcDistributedTest implements Serializable {
   }
 
   private void createMapping(String regionName, String connectionName, boolean synchronous) {
-    createMapping(regionName, connectionName, Employee.class.getName(), synchronous);
+    createMapping(regionName, connectionName, Employee.class.getName(), synchronous, null);
   }
 
   private void createMapping(String regionName, String connectionName, String pdxClassName,
       boolean synchronous) {
+    createMapping(regionName, connectionName, pdxClassName, synchronous, null);
+  }
+
+  private void createMapping(String regionName, String connectionName, String pdxClassName,
+      boolean synchronous, String ids) {
     final String commandStr = "create jdbc-mapping --region=" + regionName
         + " --data-source=" + connectionName
         + " --synchronous=" + synchronous
-        + " --pdx-name=" + pdxClassName;
+        + " --pdx-name=" + pdxClassName
+        + ((ids != null) ? (" --id=" + ids) : "");
     gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
     if (!synchronous) {
       final String alterAsyncQueue =
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index 71858f7..241e2ee 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Date;
 
+import org.json.JSONObject;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -118,6 +119,26 @@ public abstract class JdbcLoaderIntegrationTest {
   }
 
   @Test
+  public void verifyGetWithPdxClassNameAndCompositeKey() throws Exception {
+    createEmployeeTable();
+    statement
+        .execute("Insert into " + REGION_TABLE_NAME + "(id, name, age) values('1', 'Emp1', 21)");
+    String ids = "id,name";
+    Region<String, Employee> region =
+        createRegionWithJDBCLoader(REGION_TABLE_NAME, Employee.class.getName(), ids);
+    createPdxType();
+
+    JSONObject key = new JSONObject();
+    key.put("id", "1");
+    key.put("name", "Emp1");
+    Employee value = region.get(key.toString());
+
+    assertThat(value.getId()).isEqualTo("1");
+    assertThat(value.getName()).isEqualTo("Emp1");
+    assertThat(value.getAge()).isEqualTo(21);
+  }
+
+  @Test
   public void verifyGetWithSupportedFieldsWithPdxClassName() throws Exception {
     createClassWithSupportedPdxFieldsTable(statement, REGION_TABLE_NAME);
     ClassWithSupportedPdxFields classWithSupportedPdxFields =
@@ -149,23 +170,29 @@ public abstract class JdbcLoaderIntegrationTest {
     assertThat(pdx).isNull();
   }
 
-  private SqlHandler createSqlHandler(String pdxClassName)
+  private SqlHandler createSqlHandler(String pdxClassName, String ids)
       throws RegionMappingExistsException {
     return new SqlHandler(new TableMetaDataManager(),
         TestConfigService.getTestConfigService((InternalCache) cache, pdxClassName,
-            getConnectionUrl()),
+            getConnectionUrl(), ids),
         testDataSourceFactory);
   }
 
-  private <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName)
+  private <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName,
+      String ids)
       throws RegionMappingExistsException {
     JdbcLoader<K, V> jdbcLoader =
-        new JdbcLoader<>(createSqlHandler(pdxClassName), cache);
+        new JdbcLoader<>(createSqlHandler(pdxClassName, ids), cache);
     RegionFactory<K, V> regionFactory = cache.createRegionFactory(REPLICATE);
     regionFactory.setCacheLoader(jdbcLoader);
     return regionFactory.create(regionName);
   }
 
+  private <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName)
+      throws RegionMappingExistsException {
+    return createRegionWithJDBCLoader(regionName, pdxClassName, null);
+  }
+
   private ClassWithSupportedPdxFields createClassWithSupportedPdxFieldsForInsert(String key) {
     ClassWithSupportedPdxFields classWithSupportedPdxFields =
         new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2, 3, 4, 5.5f, 6.0, "BigEmp",
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
index 79ff90d..4db143f 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
@@ -24,6 +24,7 @@ import java.sql.Statement;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.json.JSONObject;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,6 +39,7 @@ import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.WritablePdxInstance;
 
 public abstract class JdbcWriterIntegrationTest {
 
@@ -60,20 +62,25 @@ public abstract class JdbcWriterIntegrationTest {
   public void setUp() throws Exception {
     cache = (InternalCache) new CacheFactory().set("locators", "").set("mcast-port", "0")
         .setPdxReadSerialized(false).create();
-    employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME);
 
     connection = getConnection();
     statement = connection.createStatement();
     statement.execute("Create Table " + REGION_TABLE_NAME
         + " (id varchar(10) primary key not null, name varchar(10), age int)");
-    pdx1 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name", "Emp1")
+    pdx1 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("id", "1")
+        .writeString("name", "Emp1")
         .writeInt("age", 55).create();
-    pdx2 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name", "Emp2")
+    pdx2 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("id", "2")
+        .writeString("name", "Emp2")
         .writeInt("age", 21).create();
     employee1 = (Employee) pdx1.getObject();
     employee2 = (Employee) pdx2.getObject();
   }
 
+  private void setupRegion(String ids) throws RegionMappingExistsException {
+    employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME, ids);
+  }
+
   @After
   public void tearDown() throws Exception {
     cache.close();
@@ -102,6 +109,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canInsertIntoTable() throws Exception {
+    setupRegion(null);
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -113,7 +121,29 @@ public abstract class JdbcWriterIntegrationTest {
   }
 
   @Test
+  public void canInsertIntoTableWithCompositeKey() throws Exception {
+    setupRegion("id,age");
+    JSONObject compositeKey1 = new JSONObject();
+    compositeKey1.put("id", pdx1.getField("id"));
+    compositeKey1.put("age", pdx1.getField("age"));
+    String actualKey = compositeKey1.toString();
+    JSONObject compositeKey2 = new JSONObject();
+    compositeKey2.put("id", pdx2.getField("id"));
+    compositeKey2.put("age", pdx2.getField("age"));
+
+    employees.put(actualKey, pdx1);
+    employees.put(compositeKey2.toString(), pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
   public void canPutAllInsertIntoTable() throws Exception {
+    setupRegion(null);
     Map<String, PdxInstance> putAllMap = new HashMap<>();
     putAllMap.put("1", pdx1);
     putAllMap.put("2", pdx2);
@@ -128,6 +158,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
+    setupRegion(null);
     PdxInstance pdxInstanceWithId = cache.createPdxInstanceFactory(Employee.class.getName())
         .writeString("name", "Emp1").writeInt("age", 55).writeString("id", "3").create();
     employees.put("1", pdxInstanceWithId);
@@ -139,14 +170,17 @@ public abstract class JdbcWriterIntegrationTest {
   }
 
   @Test
-  public void putNonPdxInstanceFails() {
+  public void putNonPdxInstanceFails() throws RegionMappingExistsException {
+    setupRegion(null);
     Region nonPdxEmployees = this.employees;
     Throwable thrown = catchThrowable(() -> nonPdxEmployees.put("1", "non pdx instance"));
     assertThat(thrown).isInstanceOf(IllegalArgumentException.class);
   }
 
   @Test
-  public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
+  public void putNonPdxInstanceThatIsPdxSerializable()
+      throws SQLException, RegionMappingExistsException {
+    setupRegion(null);
     Region nonPdxEmployees = this.employees;
     Employee value = new Employee("2", "Emp2", 22);
     nonPdxEmployees.put("2", value);
@@ -159,6 +193,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canDestroyFromTable() throws Exception {
+    setupRegion(null);
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -171,7 +206,28 @@ public abstract class JdbcWriterIntegrationTest {
   }
 
   @Test
+  public void canDestroyFromTableWithCompositeKey() throws Exception {
+    setupRegion("id,age");
+    JSONObject compositeKey1 = new JSONObject();
+    compositeKey1.put("id", pdx1.getField("id"));
+    compositeKey1.put("age", pdx1.getField("age"));
+    JSONObject compositeKey2 = new JSONObject();
+    compositeKey2.put("id", pdx2.getField("id"));
+    compositeKey2.put("age", pdx2.getField("age"));
+    employees.put(compositeKey1.toString(), pdx1);
+    employees.put(compositeKey2.toString(), pdx2);
+
+    employees.destroy(compositeKey1.toString());
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
   public void canUpdateTable() throws Exception {
+    setupRegion(null);
     employees.put("1", pdx1);
     employees.put("1", pdx2);
 
@@ -182,7 +238,30 @@ public abstract class JdbcWriterIntegrationTest {
   }
 
   @Test
+  public void canUpdateTableWithCompositeKey() throws Exception {
+    setupRegion("id,age");
+    PdxInstance myPdx = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("id", "1").writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    JSONObject compositeKey1 = new JSONObject();
+    compositeKey1.put("id", myPdx.getField("id"));
+    compositeKey1.put("age", myPdx.getField("age"));
+    employees.put(compositeKey1.toString(), myPdx);
+    WritablePdxInstance updatedPdx = myPdx.createWriter();
+    updatedPdx.setField("name", "updated");
+    Employee updatedEmployee = (Employee) updatedPdx.getObject();
+
+    employees.put(compositeKey1.toString(), updatedPdx);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", updatedEmployee);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
   public void canUpdateBecomeInsert() throws Exception {
+    setupRegion(null);
     employees.put("1", pdx1);
 
     statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
@@ -198,6 +277,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canInsertBecomeUpdate() throws Exception {
+    setupRegion(null);
     statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
     validateTableRowCount(1);
 
@@ -209,9 +289,10 @@ public abstract class JdbcWriterIntegrationTest {
     assertThat(resultSet.next()).isFalse();
   }
 
-  private Region<String, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName)
+  private Region<String, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName,
+      String ids)
       throws RegionMappingExistsException {
-    jdbcWriter = new JdbcWriter(createSqlHandler(), cache);
+    jdbcWriter = new JdbcWriter(createSqlHandler(ids), cache);
 
     RegionFactory<String, PdxInstance> regionFactory =
         cache.createRegionFactory(RegionShortcut.REPLICATE);
@@ -226,17 +307,17 @@ public abstract class JdbcWriterIntegrationTest {
     assertThat(size).isEqualTo(expected);
   }
 
-  private SqlHandler createSqlHandler()
+  private SqlHandler createSqlHandler(String ids)
       throws RegionMappingExistsException {
     return new SqlHandler(new TableMetaDataManager(),
-        TestConfigService.getTestConfigService(getConnectionUrl()),
+        TestConfigService.getTestConfigService(getConnectionUrl(), ids),
         testDataSourceFactory);
   }
 
-  private void assertRecordMatchesEmployee(ResultSet resultSet, String key, Employee employee)
+  private void assertRecordMatchesEmployee(ResultSet resultSet, String id, Employee employee)
       throws SQLException {
     assertThat(resultSet.next()).isTrue();
-    assertThat(resultSet.getString("id")).isEqualTo(key);
+    assertThat(resultSet.getString("id")).isEqualTo(id);
     assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
     assertThat(resultSet.getInt("age")).isEqualTo(employee.getAge());
   }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
index 399229c..485745e 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
@@ -23,6 +23,8 @@ import java.sql.DatabaseMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
 
 import org.junit.After;
 import org.junit.Before;
@@ -85,9 +87,9 @@ public abstract class TableMetaDataManagerIntegrationTest {
     createTable();
     TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME, null);
 
-    String keyColumnName = metaData.getKeyColumnName();
+    List<String> keyColumnNames = metaData.getKeyColumnNames();
 
-    assertThat(keyColumnName).isEqualTo("id");
+    assertThat(keyColumnNames).isEqualTo(Arrays.asList("id"));
   }
 
   @Test
@@ -96,9 +98,9 @@ public abstract class TableMetaDataManagerIntegrationTest {
     TableMetaDataView metaData =
         manager.getTableMetaDataView(connection, REGION_TABLE_NAME, "nonprimaryid");
 
-    String keyColumnName = metaData.getKeyColumnName();
+    List<String> keyColumnNames = metaData.getKeyColumnNames();
 
-    assertThat(keyColumnName).isEqualTo("nonprimaryid");
+    assertThat(keyColumnNames).isEqualTo(Arrays.asList("nonprimaryid"));
   }
 
   @Test
@@ -107,9 +109,9 @@ public abstract class TableMetaDataManagerIntegrationTest {
     TableMetaDataView metaData =
         manager.getTableMetaDataView(connection, REGION_TABLE_NAME, "NonPrimaryId");
 
-    String keyColumnName = metaData.getKeyColumnName();
+    List<String> keyColumnNames = metaData.getKeyColumnNames();
 
-    assertThat(keyColumnName).isEqualTo("NonPrimaryId");
+    assertThat(keyColumnNames).isEqualTo(Arrays.asList("NonPrimaryId"));
   }
 
   @Test
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
index 2441ae8..cd31ce6 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
@@ -34,13 +34,24 @@ public class TestConfigService {
     return getTestConfigService(createMockCache(), null, connectionUrl);
   }
 
+  public static JdbcConnectorServiceImpl getTestConfigService(String connectionUrl, String ids)
+      throws RegionMappingExistsException {
+    return getTestConfigService(createMockCache(), null, connectionUrl, ids);
+  }
+
   public static JdbcConnectorServiceImpl getTestConfigService(InternalCache cache,
       String pdxClassName, String connectionUrl)
       throws RegionMappingExistsException {
+    return getTestConfigService(cache, pdxClassName, connectionUrl, null);
+  }
+
+  public static JdbcConnectorServiceImpl getTestConfigService(InternalCache cache,
+      String pdxClassName, String connectionUrl, String ids)
+      throws RegionMappingExistsException {
 
     JdbcConnectorServiceImpl service = new JdbcConnectorServiceImpl();
     service.init(cache);
-    service.createRegionMapping(createRegionMapping(pdxClassName));
+    service.createRegionMapping(createRegionMapping(pdxClassName, ids));
     return service;
   }
 
@@ -50,8 +61,8 @@ public class TestConfigService {
     return cache;
   }
 
-  private static RegionMapping createRegionMapping(String pdxClassName) {
+  private static RegionMapping createRegionMapping(String pdxClassName, String ids) {
     return new RegionMapping(REGION_NAME, pdxClassName, REGION_TABLE_NAME,
-        CONNECTION_CONFIG_NAME, null);
+        CONNECTION_CONFIG_NAME, ids);
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/EntryColumnData.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/EntryColumnData.java
index 5630f03..c3a4bbb 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/EntryColumnData.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/EntryColumnData.java
@@ -18,16 +18,17 @@ import java.util.Collections;
 import java.util.List;
 
 class EntryColumnData {
-  private final ColumnData entryKeyColumnData;
+  private final List<ColumnData> entryKeyColumnData;
   private final List<ColumnData> entryValueColumnData;
 
-  EntryColumnData(ColumnData entryKeyColumnData, List<ColumnData> entryValueColumnData) {
-    this.entryKeyColumnData = entryKeyColumnData;
+  EntryColumnData(List<ColumnData> entryKeyColumnData, List<ColumnData> entryValueColumnData) {
+    this.entryKeyColumnData =
+        entryKeyColumnData != null ? entryKeyColumnData : Collections.emptyList();
     this.entryValueColumnData =
         entryValueColumnData != null ? entryValueColumnData : Collections.emptyList();
   }
 
-  public ColumnData getEntryKeyColumnData() {
+  public List<ColumnData> getEntryKeyColumnData() {
     return entryKeyColumnData;
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 9335d03..a09db75 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -22,9 +22,13 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Set;
 
 import javax.sql.DataSource;
 
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Operation;
@@ -94,7 +98,7 @@ public class SqlHandler {
 
   private ResultSet executeReadQuery(PreparedStatement statement, EntryColumnData entryColumnData)
       throws SQLException {
-    setValuesInStatement(statement, entryColumnData);
+    setValuesInStatement(statement, entryColumnData, Operation.GET);
     return statement.executeQuery();
   }
 
@@ -108,17 +112,23 @@ public class SqlHandler {
     return regionMapping;
   }
 
-  private void setValuesInStatement(PreparedStatement statement, EntryColumnData entryColumnData)
+  private void setValuesInStatement(PreparedStatement statement, EntryColumnData entryColumnData,
+      Operation operation)
       throws SQLException {
     int index = 0;
-    for (ColumnData columnData : entryColumnData.getEntryValueColumnData()) {
+    if (operation.isCreate() || operation.isUpdate()) {
+      index = setValuesFromColumnData(statement, entryColumnData.getEntryValueColumnData(), index);
+    }
+    setValuesFromColumnData(statement, entryColumnData.getEntryKeyColumnData(), index);
+  }
+
+  private int setValuesFromColumnData(PreparedStatement statement, List<ColumnData> columnDataList,
+      int index) throws SQLException {
+    for (ColumnData columnData : columnDataList) {
       index++;
       setValueOnStatement(statement, index, columnData);
     }
-
-    ColumnData keyColumnData = entryColumnData.getEntryKeyColumnData();
-    index++;
-    setValueOnStatement(statement, index, keyColumnData);
+    return index;
   }
 
   private void setValueOnStatement(PreparedStatement statement, int index, ColumnData columnData)
@@ -156,7 +166,7 @@ public class SqlHandler {
 
   public <K, V> void write(Region<K, V> region, Operation operation, K key, PdxInstance value)
       throws SQLException {
-    if (value == null && operation != Operation.DESTROY) {
+    if (value == null && !operation.isDestroy()) {
       throw new IllegalArgumentException("PdxInstance cannot be null for non-destroy operations");
     }
     RegionMapping regionMapping = getMappingForRegion(region.getName());
@@ -169,7 +179,7 @@ public class SqlHandler {
       int updateCount = 0;
       try (PreparedStatement statement =
           getPreparedStatement(connection, tableMetaData, entryColumnData, operation)) {
-        updateCount = executeWriteStatement(statement, entryColumnData);
+        updateCount = executeWriteStatement(statement, entryColumnData, operation);
       } catch (SQLException e) {
         if (operation.isDestroy()) {
           throw e;
@@ -185,11 +195,11 @@ public class SqlHandler {
         Operation upsertOp = getOppositeOperation(operation);
         try (PreparedStatement upsertStatement =
             getPreparedStatement(connection, tableMetaData, entryColumnData, upsertOp)) {
-          updateCount = executeWriteStatement(upsertStatement, entryColumnData);
+          updateCount = executeWriteStatement(upsertStatement, entryColumnData, operation);
         }
       }
 
-      assert updateCount == 1;
+      assert updateCount == 1 : "expected 1 but updateCount was: " + updateCount;
     }
   }
 
@@ -197,9 +207,10 @@ public class SqlHandler {
     return operation.isUpdate() ? Operation.CREATE : Operation.UPDATE;
   }
 
-  private int executeWriteStatement(PreparedStatement statement, EntryColumnData entryColumnData)
+  private int executeWriteStatement(PreparedStatement statement, EntryColumnData entryColumnData,
+      Operation operation)
       throws SQLException {
-    setValuesInStatement(statement, entryColumnData);
+    setValuesInStatement(statement, entryColumnData, operation);
     return statement.executeUpdate();
   }
 
@@ -230,24 +241,64 @@ public class SqlHandler {
 
   <K> EntryColumnData getEntryColumnData(TableMetaDataView tableMetaData,
       RegionMapping regionMapping, K key, PdxInstance value, Operation operation) {
-    String keyColumnName = tableMetaData.getKeyColumnName();
-    ColumnData keyColumnData =
-        new ColumnData(keyColumnName, key, tableMetaData.getColumnDataType(keyColumnName));
+    List<ColumnData> keyColumnData = createKeyColumnDataList(tableMetaData, regionMapping, key);
     List<ColumnData> valueColumnData = null;
 
     if (operation.isCreate() || operation.isUpdate()) {
-      valueColumnData = createColumnDataList(tableMetaData, regionMapping, value);
+      valueColumnData = createValueColumnDataList(tableMetaData, regionMapping, value);
     }
 
     return new EntryColumnData(keyColumnData, valueColumnData);
   }
 
-  private List<ColumnData> createColumnDataList(TableMetaDataView tableMetaData,
+  private <K> List<ColumnData> createKeyColumnDataList(TableMetaDataView tableMetaData,
+      RegionMapping regionMapping, K key) {
+    List<String> keyColumnNames = tableMetaData.getKeyColumnNames();
+    List<ColumnData> result = new ArrayList<>();
+    if (keyColumnNames.size() == 1) {
+      String keyColumnName = keyColumnNames.get(0);
+      ColumnData columnData =
+          new ColumnData(keyColumnName, key, tableMetaData.getColumnDataType(keyColumnName));
+      result.add(columnData);
+    } else {
+      if (!(key instanceof String)) {
+        throw new JdbcConnectorException(
+            "The key \"" + key + "\" of class \"" + key.getClass().getName()
+                + "\" must be a java.lang.String because multiple columns are configured as ids.");
+      }
+      JSONObject compositeKey = null;
+      try {
+        compositeKey = new JSONObject((String) key);
+      } catch (JSONException ex) {
+        throw new JdbcConnectorException("The key \"" + key
+            + "\" must be a valid JSON string because multiple columns are configured as ids. Details: "
+            + ex.getMessage());
+      }
+      Set<String> fieldNames = compositeKey.keySet();
+      if (fieldNames.size() != keyColumnNames.size()) {
+        throw new JdbcConnectorException("The key \"" + key + "\" should have "
+            + keyColumnNames.size() + " fields but has " + fieldNames.size() + " fields.");
+      }
+      for (String fieldName : fieldNames) {
+        String columnName = regionMapping.getColumnNameForField(fieldName, tableMetaData);
+        if (!keyColumnNames.contains(columnName)) {
+          throw new JdbcConnectorException("The key \"" + key + "\" has the field \"" + fieldName
+              + "\" which does not match any of the key columns: " + keyColumnNames);
+        }
+        ColumnData columnData = new ColumnData(columnName, compositeKey.get(fieldName),
+            tableMetaData.getColumnDataType(columnName));
+        result.add(columnData);
+      }
+    }
+    return result;
+  }
+
+  private List<ColumnData> createValueColumnDataList(TableMetaDataView tableMetaData,
       RegionMapping regionMapping, PdxInstance value) {
     List<ColumnData> result = new ArrayList<>();
     for (String fieldName : value.getFieldNames()) {
       String columnName = regionMapping.getColumnNameForField(fieldName, tableMetaData);
-      if (tableMetaData.getKeyColumnName().equals(columnName)) {
+      if (tableMetaData.getKeyColumnNames().contains(columnName)) {
         continue;
       }
       ColumnData columnData = new ColumnData(columnName, value.getField(fieldName),
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
index 5087bbe..c2c697f 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
@@ -14,6 +14,9 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
+import java.util.Iterator;
+import java.util.stream.Stream;
+
 class SqlStatementFactory {
   private final String quote;
 
@@ -22,19 +25,32 @@ class SqlStatementFactory {
   }
 
   String createSelectQueryString(String tableName, EntryColumnData entryColumnData) {
-    ColumnData keyCV = entryColumnData.getEntryKeyColumnData();
-    return "SELECT * FROM " + quoteIdentifier(tableName) + " WHERE "
-        + quoteIdentifier(keyCV.getColumnName()) + " = ?";
+    return addKeyColumnsToQuery(entryColumnData,
+        new StringBuilder("SELECT * FROM " + quoteIdentifier(tableName)));
   }
 
   String createDestroySqlString(String tableName, EntryColumnData entryColumnData) {
-    ColumnData keyCV = entryColumnData.getEntryKeyColumnData();
-    return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE "
-        + quoteIdentifier(keyCV.getColumnName()) + " = ?";
+    return addKeyColumnsToQuery(entryColumnData,
+        new StringBuilder("DELETE FROM " + quoteIdentifier(tableName)));
+  }
+
+  private String addKeyColumnsToQuery(EntryColumnData entryColumnData, StringBuilder queryBuilder) {
+    queryBuilder.append(" WHERE ");
+    Iterator<ColumnData> iterator = entryColumnData.getEntryKeyColumnData().iterator();
+    while (iterator.hasNext()) {
+      ColumnData keyColumn = iterator.next();
+      boolean onLastColumn = !iterator.hasNext();
+      queryBuilder.append(quoteIdentifier(keyColumn.getColumnName())).append(" = ?");
+      if (!onLastColumn) {
+        queryBuilder.append(" AND ");
+      }
+    }
+    return queryBuilder.toString();
   }
 
   String createUpdateSqlString(String tableName, EntryColumnData entryColumnData) {
-    StringBuilder query = new StringBuilder("UPDATE " + quoteIdentifier(tableName) + " SET ");
+    StringBuilder query =
+        new StringBuilder("UPDATE ").append(quoteIdentifier(tableName)).append(" SET ");
     int idx = 0;
     for (ColumnData column : entryColumnData.getEntryValueColumnData()) {
       idx++;
@@ -44,31 +60,37 @@ class SqlStatementFactory {
       query.append(quoteIdentifier(column.getColumnName()));
       query.append(" = ?");
     }
-
-    ColumnData keyColumnData = entryColumnData.getEntryKeyColumnData();
-    query.append(" WHERE ");
-    query.append(quoteIdentifier(keyColumnData.getColumnName()));
-    query.append(" = ?");
-
-    return query.toString();
+    return addKeyColumnsToQuery(entryColumnData, query);
   }
 
   String createInsertSqlString(String tableName, EntryColumnData entryColumnData) {
     StringBuilder columnNames =
-        new StringBuilder("INSERT INTO " + quoteIdentifier(tableName) + " (");
+        new StringBuilder("INSERT INTO ").append(quoteIdentifier(tableName)).append(" (");
     StringBuilder columnValues = new StringBuilder(" VALUES (");
-
-    for (ColumnData column : entryColumnData.getEntryValueColumnData()) {
-      columnNames.append(quoteIdentifier(column.getColumnName())).append(", ");
-      columnValues.append("?,");
-    }
-
-    ColumnData keyColumnData = entryColumnData.getEntryKeyColumnData();
-    columnNames.append(quoteIdentifier(keyColumnData.getColumnName())).append(")");
-    columnValues.append("?)");
+    addColumnDataToSqlString(entryColumnData, columnNames, columnValues);
+    columnNames.append(')');
+    columnValues.append(')');
     return columnNames.append(columnValues).toString();
   }
 
+  private void addColumnDataToSqlString(EntryColumnData entryColumnData, StringBuilder columnNames,
+      StringBuilder columnValues) {
+    Stream<ColumnData> values = entryColumnData.getEntryValueColumnData().stream();
+    Stream<ColumnData> keys = entryColumnData.getEntryKeyColumnData().stream();
+    Stream<ColumnData> columnDataStream = Stream.concat(values, keys);
+    final boolean[] firstTime = new boolean[] {true};
+    columnDataStream.forEachOrdered(column -> {
+      if (!firstTime[0]) {
+        columnNames.append(',');
+        columnValues.append(',');
+      } else {
+        firstTime[0] = false;
+      }
+      columnNames.append(quoteIdentifier(column.getColumnName()));
+      columnValues.append('?');
+    });
+  }
+
   private String quoteIdentifier(String identifier) {
     return quote + identifier + quote;
   }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
index 301e8b1..904acba 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
@@ -17,18 +17,19 @@
 package org.apache.geode.connectors.jdbc.internal;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Set;
 
 public class TableMetaData implements TableMetaDataView {
 
   private final String tableName;
-  private final String keyColumnName;
+  private final List<String> keyColumnNames;
   private final HashMap<String, Integer> columnNameToTypeMap;
   private final String identifierQuoteString;
 
-  public TableMetaData(String tableName, String keyColumnName, String quoteString) {
+  public TableMetaData(String tableName, List<String> keyColumnNames, String quoteString) {
     this.tableName = tableName;
-    this.keyColumnName = keyColumnName;
+    this.keyColumnNames = keyColumnNames;
     this.columnNameToTypeMap = new HashMap<>();
     this.identifierQuoteString = quoteString;
   }
@@ -39,8 +40,8 @@ public class TableMetaData implements TableMetaDataView {
   }
 
   @Override
-  public String getKeyColumnName() {
-    return this.keyColumnName;
+  public List<String> getKeyColumnNames() {
+    return this.keyColumnNames;
   }
 
   @Override
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
index a454d3e..f5f9b16 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
@@ -18,6 +18,9 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -47,12 +50,12 @@ public class TableMetaDataManager {
       DatabaseMetaData metaData = connection.getMetaData();
       try (ResultSet tables = metaData.getTables(null, null, "%", null)) {
         String realTableName = getTableNameFromMetaData(tableName, tables);
-        String key = getPrimaryKeyColumnNameFromMetaData(realTableName, metaData, ids);
+        List<String> keys = getPrimaryKeyColumnNamesFromMetaData(realTableName, metaData, ids);
         String quoteString = metaData.getIdentifierQuoteString();
         if (quoteString == null) {
           quoteString = "";
         }
-        result = new TableMetaData(realTableName, key, quoteString);
+        result = new TableMetaData(realTableName, keys, quoteString);
         getDataTypesFromMetaData(realTableName, metaData, result);
       }
     } catch (SQLException e) {
@@ -85,28 +88,30 @@ public class TableMetaDataManager {
     return result;
   }
 
-  private String getPrimaryKeyColumnNameFromMetaData(String tableName, DatabaseMetaData metaData,
+  private List<String> getPrimaryKeyColumnNamesFromMetaData(String tableName,
+      DatabaseMetaData metaData,
       String ids)
       throws SQLException {
+    List<String> keys = new ArrayList<>();
+
     if (ids != null && !ids.isEmpty()) {
-      if (!doesColumnExistInTable(tableName, metaData, ids)) {
-        throw new JdbcConnectorException(
-            "The table " + tableName + " does not have a column named " + ids);
-      }
-      return ids;
-    }
-    try (ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, tableName)) {
-      if (!primaryKeys.next()) {
-        throw new JdbcConnectorException(
-            "The table " + tableName + " does not have a primary key column.");
+      keys.addAll(Arrays.asList(ids.split(",")));
+      for (String key : keys) {
+        checkColumnExistsInTable(tableName, metaData, key);
       }
-      String key = primaryKeys.getString("COLUMN_NAME");
-      if (primaryKeys.next()) {
-        throw new JdbcConnectorException(
-            "The table " + tableName + " has more than one primary key column.");
+    } else {
+      try (ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, tableName)) {
+        while (primaryKeys.next()) {
+          String key = primaryKeys.getString("COLUMN_NAME");
+          keys.add(key);
+        }
+        if (keys.isEmpty()) {
+          throw new JdbcConnectorException(
+              "The table " + tableName + " does not have a primary key column.");
+        }
       }
-      return key;
     }
+    return keys;
   }
 
   private void getDataTypesFromMetaData(String tableName, DatabaseMetaData metaData,
@@ -120,14 +125,14 @@ public class TableMetaDataManager {
     }
   }
 
-  private boolean doesColumnExistInTable(String tableName, DatabaseMetaData metaData,
+  private void checkColumnExistsInTable(String tableName, DatabaseMetaData metaData,
       String columnName) throws SQLException {
     int caseInsensitiveMatches = 0;
     try (ResultSet columnData = metaData.getColumns(null, null, tableName, "%")) {
       while (columnData.next()) {
         String realColumnName = columnData.getString("COLUMN_NAME");
         if (columnName.equals(realColumnName)) {
-          return true;
+          return;
         } else if (columnName.equalsIgnoreCase(realColumnName)) {
           caseInsensitiveMatches++;
         }
@@ -136,7 +141,9 @@ public class TableMetaDataManager {
     if (caseInsensitiveMatches > 1) {
       throw new JdbcConnectorException(
           "The table " + tableName + " has more than one column that matches " + columnName);
+    } else if (caseInsensitiveMatches == 0) {
+      throw new JdbcConnectorException(
+          "The table " + tableName + " does not have a column named " + columnName);
     }
-    return caseInsensitiveMatches != 0;
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
index 189d794..3c18dd4 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
@@ -16,12 +16,14 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
+
+import java.util.List;
 import java.util.Set;
 
 public interface TableMetaDataView {
   String getTableName();
 
-  String getKeyColumnName();
+  List<String> getKeyColumnNames();
 
   int getColumnDataType(String columnName);
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index e638d3c..4f8a0ab 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -65,7 +65,7 @@ public class CreateMappingCommand extends SingleGfshCommand {
       "By default, writes will be asynchronous. If true, writes will be synchronous.";
   static final String CREATE_MAPPING__ID_NAME = "id";
   static final String CREATE_MAPPING__ID_NAME__HELP =
-      "The table column name to use as the region key for this JDBC mapping.";
+      "The table column names to use as the region key for this JDBC mapping. If more than one column name is given then they must be separated by commas.";
 
   public static String createAsyncEventQueueName(String regionPath) {
     if (regionPath.startsWith("/")) {
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index d93743f..ca25888 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -38,6 +38,7 @@ import java.util.Date;
 import javax.sql.DataSource;
 
 import junitparams.JUnitParamsRunner;
+import org.json.JSONObject;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -90,7 +91,7 @@ public class SqlHandlerTest {
     tableMetaDataManager = mock(TableMetaDataManager.class);
     tableMetaDataView = mock(TableMetaDataView.class);
     when(tableMetaDataView.getTableName()).thenReturn(TABLE_NAME);
-    when(tableMetaDataView.getKeyColumnName()).thenReturn(KEY_COLUMN);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList(KEY_COLUMN));
     final String IDS = "ids";
     when(tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, IDS))
         .thenReturn(tableMetaDataView);
@@ -125,8 +126,12 @@ public class SqlHandlerTest {
   @SuppressWarnings("unchecked")
   @Test
   public void readThrowsIfNoMapping() throws Exception {
+    Region region = mock(Region.class);
+    when(region.getName()).thenReturn("myRegionName");
     thrown.expect(JdbcConnectorException.class);
-    handler.read(mock(Region.class), new Object());
+    thrown.expectMessage(
+        "JDBC mapping for region myRegionName not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
+    handler.read(region, new Object());
   }
 
   @Test
@@ -355,7 +360,6 @@ public class SqlHandlerTest {
     verify(statement).close();
   }
 
-
   @Test
   public void insertActionSucceeds() throws Exception {
     when(statement.executeUpdate()).thenReturn(1);
@@ -367,6 +371,27 @@ public class SqlHandlerTest {
   }
 
   @Test
+  public void insertActionSucceedsWithCompositeKey() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    Object compositeKeyFieldValueTwo = "fieldValueTwo";
+    JSONObject compositeKey = new JSONObject();
+    compositeKey.put("fieldOne", compositeKeyFieldValueOne);
+    compositeKey.put("fieldTwo", compositeKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+
+    handler.write(region, Operation.CREATE, compositeKey.toString(), value);
+
+    verify(statement).setObject(1, compositeKeyFieldValueOne);
+    verify(statement).setObject(2, compositeKeyFieldValueTwo);
+    verify(statement, times(2)).setObject(anyInt(), any());
+    verify(statement).executeUpdate();
+    verify(statement).close();
+  }
+
+  @Test
   public void updateActionSucceeds() throws Exception {
     when(statement.executeUpdate()).thenReturn(1);
     Object updateKey = "updateKey";
@@ -377,6 +402,27 @@ public class SqlHandlerTest {
   }
 
   @Test
+  public void updateActionSucceedsWithCompositeKey() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    Object compositeKeyFieldValueTwo = "fieldValueTwo";
+    JSONObject compositeKey = new JSONObject();
+    compositeKey.put("fieldOne", compositeKeyFieldValueOne);
+    compositeKey.put("fieldTwo", compositeKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+
+    handler.write(region, Operation.UPDATE, compositeKey.toString(), value);
+
+    verify(statement).setObject(1, compositeKeyFieldValueOne);
+    verify(statement).setObject(2, compositeKeyFieldValueTwo);
+    verify(statement, times(2)).setObject(anyInt(), any());
+    verify(statement).executeUpdate();
+    verify(statement).close();
+  }
+
+  @Test
   public void destroyActionSucceeds() throws Exception {
     when(statement.executeUpdate()).thenReturn(1);
     Object destroyKey = "destroyKey";
@@ -387,6 +433,26 @@ public class SqlHandlerTest {
   }
 
   @Test
+  public void destroyActionSucceedsWithCompositeKey() throws Exception {
+    when(statement.executeUpdate()).thenReturn(1);
+    Object destroyKeyFieldValueOne = "fieldValueOne";
+    Object destroyKeyFieldValueTwo = "fieldValueTwo";
+    JSONObject destroyKey = new JSONObject();
+    destroyKey.put("fieldOne", destroyKeyFieldValueOne);
+    destroyKey.put("fieldTwo", destroyKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+
+    handler.write(region, Operation.DESTROY, destroyKey.toString(), value);
+
+    verify(statement).setObject(1, destroyKeyFieldValueOne);
+    verify(statement).setObject(2, destroyKeyFieldValueTwo);
+    verify(statement, times(2)).setObject(anyInt(), any());
+    verify(statement).close();
+  }
+
+  @Test
   public void destroyActionThatRemovesNoRowCompletesUnexceptionally() throws Exception {
     when(statement.executeUpdate()).thenReturn(0);
     Object destroyKey = "destroyKey";
@@ -504,47 +570,202 @@ public class SqlHandlerTest {
 
   @Test
   public void returnsCorrectColumnForGet() throws Exception {
-    ResultSet primaryKeys = getPrimaryKeysMetaData();
-    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
-
     EntryColumnData entryColumnData =
         handler.getEntryColumnData(tableMetaDataView, regionMapping, key, value, Operation.GET);
 
     assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
     assertThat(entryColumnData.getEntryValueColumnData()).isEmpty();
-    assertThat(entryColumnData.getEntryKeyColumnData().getColumnName()).isEqualTo(KEY_COLUMN);
+    assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(1);
+    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
+        .isEqualTo(KEY_COLUMN);
   }
 
   @Test
-  public void returnsCorrectColumnsForUpsertOperations() throws Exception {
-    ResultSet primaryKeys = getPrimaryKeysMetaData();
+  public void returnsCorrectColumnForGetGivenCompositeKey() throws Exception {
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    Object compositeKeyFieldValueTwo = "fieldValueTwo";
+    JSONObject compositeKey = new JSONObject();
+    compositeKey.put("fieldOne", compositeKeyFieldValueOne);
+    compositeKey.put("fieldTwo", compositeKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+
+    EntryColumnData entryColumnData =
+        handler.getEntryColumnData(tableMetaDataView, regionMapping, compositeKey.toString(), value,
+            Operation.GET);
+
+    assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
+    assertThat(entryColumnData.getEntryValueColumnData()).isEmpty();
+    assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(2);
+    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
+        .isEqualTo("fieldOne");
+    assertThat(entryColumnData.getEntryKeyColumnData().get(1).getColumnName())
+        .isEqualTo("fieldTwo");
+  }
+
+  @Test
+  public void getEntryColumnDataGivenWrongNumberOfCompositeKeyFieldsFails() throws Exception {
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    JSONObject compositeKey = new JSONObject();
+    compositeKey.put("fieldOne", compositeKeyFieldValueOne);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage(
+        "The key \"" + compositeKey.toString() + "\" should have 2 fields but has 1 fields.");
+
+    handler.getEntryColumnData(tableMetaDataView, regionMapping, compositeKey.toString(), value,
+        Operation.GET);
+  }
+
+  @Test
+  public void getEntryColumnDataGivenWrongFieldNameInCompositeKeyFails() throws Exception {
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    Object compositeKeyFieldValueTwo = "fieldValueTwo";
+    JSONObject compositeKey = new JSONObject();
+    compositeKey.put("fieldOne", compositeKeyFieldValueOne);
+    compositeKey.put("fieldTwoWrong", compositeKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage("The key \"" + compositeKey.toString()
+        + "\" has the field \"fieldTwoWrong\" which does not match any of the key columns: [fieldOne, fieldTwo]");
+
+    handler.getEntryColumnData(tableMetaDataView, regionMapping, compositeKey.toString(), value,
+        Operation.GET);
+  }
+
+  @Test
+  public void returnsCorrectColumnsForUpdate() throws Exception {
+    testGetEntryColumnDataForCreateOrUpdate(Operation.UPDATE);
+  }
+
+  @Test
+  public void returnsCorrectColumnsForCreate() throws Exception {
+    testGetEntryColumnDataForCreateOrUpdate(Operation.CREATE);
+  }
+
+  private void testGetEntryColumnDataForCreateOrUpdate(Operation operation) {
     String nonKeyColumn = "otherColumn";
     when(regionMapping.getColumnNameForField(eq(KEY_COLUMN), any())).thenReturn(KEY_COLUMN);
     when(regionMapping.getColumnNameForField(eq(nonKeyColumn), any())).thenReturn(nonKeyColumn);
-    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
     when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn));
 
     EntryColumnData entryColumnData =
-        handler.getEntryColumnData(tableMetaDataView, regionMapping, key, value, Operation.UPDATE);
+        handler.getEntryColumnData(tableMetaDataView, regionMapping, key, value, operation);
 
     assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
     assertThat(entryColumnData.getEntryValueColumnData()).hasSize(1);
     assertThat(entryColumnData.getEntryValueColumnData().get(0).getColumnName())
         .isEqualTo(nonKeyColumn);
-    assertThat(entryColumnData.getEntryKeyColumnData().getColumnName()).isEqualTo(KEY_COLUMN);
+    assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(1);
+    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
+        .isEqualTo(KEY_COLUMN);
   }
 
   @Test
-  public void returnsCorrectColumnForDestroy() throws Exception {
-    ResultSet primaryKeys = getPrimaryKeysMetaData();
-    when(primaryKeys.next()).thenReturn(true).thenReturn(false);
+  public void returnsCorrectColumnsForUpdateWithCompositeKey() throws Exception {
+    testGetEntryColumnDataForCreateOrUpdateWithCompositeKey(Operation.UPDATE);
+  }
+
+  @Test
+  public void returnsCorrectColumnsForCreateWithCompositeKey() throws Exception {
+    testGetEntryColumnDataForCreateOrUpdateWithCompositeKey(Operation.CREATE);
+  }
+
+  private void testGetEntryColumnDataForCreateOrUpdateWithCompositeKey(Operation operation) {
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    Object compositeKeyFieldValueTwo = "fieldValueTwo";
+    JSONObject compositeKey = new JSONObject();
+    compositeKey.put("fieldOne", compositeKeyFieldValueOne);
+    compositeKey.put("fieldTwo", compositeKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+    String nonKeyColumn = "otherColumn";
+    when(regionMapping.getColumnNameForField(eq(nonKeyColumn), any())).thenReturn(nonKeyColumn);
+    when(value.getFieldNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo", nonKeyColumn));
 
     EntryColumnData entryColumnData =
+        handler.getEntryColumnData(tableMetaDataView, regionMapping, compositeKey.toString(), value,
+            operation);
+
+    assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
+    assertThat(entryColumnData.getEntryValueColumnData()).hasSize(1);
+    assertThat(entryColumnData.getEntryValueColumnData().get(0).getColumnName())
+        .isEqualTo(nonKeyColumn);
+    assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(2);
+    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
+        .isEqualTo("fieldOne");
+    assertThat(entryColumnData.getEntryKeyColumnData().get(1).getColumnName())
+        .isEqualTo("fieldTwo");
+  }
+
+  @Test
+  public void returnsCorrectColumnForDestroyWithCompositeKey() throws Exception {
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    Object compositeKeyFieldValueTwo = "fieldValueTwo";
+    JSONObject compositeKey = new JSONObject();
+    compositeKey.put("fieldOne", compositeKeyFieldValueOne);
+    compositeKey.put("fieldTwo", compositeKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getColumnNameForField("fieldOne", tableMetaDataView)).thenReturn("fieldOne");
+    when(regionMapping.getColumnNameForField("fieldTwo", tableMetaDataView)).thenReturn("fieldTwo");
+
+    EntryColumnData entryColumnData =
+        handler.getEntryColumnData(tableMetaDataView, regionMapping, compositeKey.toString(), value,
+            Operation.DESTROY);
+
+    assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
+    assertThat(entryColumnData.getEntryValueColumnData()).isEmpty();
+    assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(2);
+    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
+        .isEqualTo("fieldOne");
+    assertThat(entryColumnData.getEntryKeyColumnData().get(1).getColumnName())
+        .isEqualTo("fieldTwo");
+    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getValue())
+        .isEqualTo(compositeKeyFieldValueOne);
+    assertThat(entryColumnData.getEntryKeyColumnData().get(1).getValue())
+        .isEqualTo(compositeKeyFieldValueTwo);
+  }
+
+  @Test
+  public void returnsCorrectColumnForDestroy() throws Exception {
+    EntryColumnData entryColumnData =
         handler.getEntryColumnData(tableMetaDataView, regionMapping, key, value, Operation.DESTROY);
 
     assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
     assertThat(entryColumnData.getEntryValueColumnData()).isEmpty();
-    assertThat(entryColumnData.getEntryKeyColumnData().getColumnName()).isEqualTo(KEY_COLUMN);
+    assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(1);
+    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
+        .isEqualTo(KEY_COLUMN);
+  }
+
+  @Test
+  public void getEntryColumnDataWhenMultipleIdColumnsGivenNonStringFails() throws Exception {
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    Object nonCompositeKey = Integer.valueOf(123);
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage(
+        "The key \"123\" of class \"java.lang.Integer\" must be a java.lang.String because multiple columns are configured as ids.");
+
+    handler.getEntryColumnData(tableMetaDataView, regionMapping, nonCompositeKey, value,
+        Operation.DESTROY);
+  }
+
+  @Test
+  public void getEntryColumnDataWhenMultipleIdColumnsGivenNonJsonStringFails() throws Exception {
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    String nonJsonKey = "myKey";
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage(
+        "The key \"myKey\" must be a valid JSON string because multiple columns are configured as ids. Details: Value myKey of type java.lang.String cannot be converted to JSONObject");
+
+    handler.getEntryColumnData(tableMetaDataView, regionMapping, nonJsonKey, value,
+        Operation.DESTROY);
   }
 
   private ResultSet getPrimaryKeysMetaData() throws SQLException {
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
index e4ccc4b..0a13981 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
@@ -26,26 +26,28 @@ import org.junit.Test;
 public class SqlStatementFactoryTest {
 
   private static final String TABLE_NAME = "testTable";
-  private static final String KEY_COLUMN_NAME = "keyColumn";
+  private static final String KEY_COLUMN_1_NAME = "keyColumn1";
+  private static final String KEY_COLUMN_2_NAME = "keyColumn2";
   private static final String VALUE_COLUMN_1_NAME = "valueColumn1";
   private static final String VALUE_COLUMN_2_NAME = "valueColumn2";
+  private final List<ColumnData> keyColumnData = new ArrayList<>();
+  private final List<ColumnData> valueColumnData = new ArrayList<>();
 
   private EntryColumnData entryColumnData;
   private SqlStatementFactory factory = new SqlStatementFactory("");
 
   @Before
   public void setup() {
-    List<ColumnData> columnData = new ArrayList<>();
-    columnData.add(new ColumnData(VALUE_COLUMN_1_NAME, null, 0));
-    columnData.add(new ColumnData(VALUE_COLUMN_2_NAME, null, 0));
-    ColumnData keyColumnData = new ColumnData(KEY_COLUMN_NAME, null, 0);
-    entryColumnData = new EntryColumnData(keyColumnData, columnData);
+    valueColumnData.add(new ColumnData(VALUE_COLUMN_1_NAME, null, 0));
+    valueColumnData.add(new ColumnData(VALUE_COLUMN_2_NAME, null, 0));
+    keyColumnData.add(new ColumnData(KEY_COLUMN_1_NAME, null, 0));
+    entryColumnData = new EntryColumnData(keyColumnData, valueColumnData);
   }
 
   @Test
   public void getSelectQueryString() throws Exception {
     String expectedStatement =
-        String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+        String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME);
 
     String statement = factory.createSelectQueryString(TABLE_NAME, entryColumnData);
 
@@ -55,7 +57,7 @@ public class SqlStatementFactoryTest {
   @Test
   public void getDestroySqlString() throws Exception {
     String expectedStatement =
-        String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_NAME);
+        String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME);
 
     String statement = factory.createDestroySqlString(TABLE_NAME, entryColumnData);
 
@@ -65,7 +67,7 @@ public class SqlStatementFactoryTest {
   @Test
   public void getUpdateSqlString() throws Exception {
     String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
-        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_NAME);
+        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME);
 
     String statement = factory.createUpdateSqlString(TABLE_NAME, entryColumnData);
 
@@ -74,12 +76,69 @@ public class SqlStatementFactoryTest {
 
   @Test
   public void getInsertSqlString() throws Exception {
-    String expectedStatement = String.format("INSERT INTO %s (%s, %s, %s) VALUES (?,?,?)",
-        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_NAME);
+    String expectedStatement = String.format("INSERT INTO %s (%s,%s,%s) VALUES (?,?,?)",
+        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME);
 
     String statement = factory.createInsertSqlString(TABLE_NAME, entryColumnData);
 
     assertThat(statement).isEqualTo(expectedStatement);
   }
 
+  @Test
+  public void getInsertSqlStringGivenNoColumns() throws Exception {
+    valueColumnData.clear();
+    keyColumnData.clear();
+
+    String statement = factory.createInsertSqlString(TABLE_NAME, entryColumnData);
+
+    String expectedStatement = String.format("INSERT INTO %s () VALUES ()", TABLE_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getInsertSqlStringGivenMultipleKeys() throws Exception {
+    keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
+
+    String statement = factory.createInsertSqlString(TABLE_NAME, entryColumnData);
+
+    String expectedStatement = String.format("INSERT INTO %s (%s,%s,%s,%s) VALUES (?,?,?,?)",
+        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME, KEY_COLUMN_2_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getUpdateSqlStringGivenMultipleKeys() throws Exception {
+    keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
+
+    String statement = factory.createUpdateSqlString(TABLE_NAME, entryColumnData);
+
+    String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ? AND %s = ?",
+        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME, KEY_COLUMN_2_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getSelectQueryStringGivenMultipleKeys() throws Exception {
+    keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
+
+    String statement = factory.createSelectQueryString(TABLE_NAME, entryColumnData);
+
+    String expectedStatement =
+        String.format("SELECT * FROM %s WHERE %s = ? AND %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME,
+            KEY_COLUMN_2_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
+  @Test
+  public void getDestroySqlStringGivenMultipleKeys() throws Exception {
+    keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
+
+    String statement = factory.createDestroySqlString(TABLE_NAME, entryColumnData);
+
+    String expectedStatement =
+        String.format("DELETE FROM %s WHERE %s = ? AND %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME,
+            KEY_COLUMN_2_NAME);
+    assertThat(statement).isEqualTo(expectedStatement);
+  }
+
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
index 22fe169..df042df 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
@@ -30,6 +30,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.Arrays;
 import java.util.Date;
 
 import junitparams.JUnitParamsRunner;
@@ -75,7 +76,7 @@ public class SqlToPdxInstanceCreatorTest {
     regionMapping = mock(RegionMapping.class);
     resultSet = mock(ResultSet.class);
     tableMetaDataView = mock(TableMetaDataView.class);
-    when(tableMetaDataView.getKeyColumnName()).thenReturn(KEY_COLUMN);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList(KEY_COLUMN));
   }
 
   @Test
@@ -109,7 +110,7 @@ public class SqlToPdxInstanceCreatorTest {
     when(regionMapping.getFieldNameForColumn(eq(COLUMN_NAME_2), any()))
         .thenReturn(PDX_FIELD_NAME_2);
     tableMetaDataView = mock(TableMetaDataView.class);
-    when(tableMetaDataView.getKeyColumnName()).thenReturn(COLUMN_NAME_1);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList(COLUMN_NAME_1));
     TypeRegistry pdxTypeRegistry = mock(TypeRegistry.class);
     when(cache.getPdxRegistry()).thenReturn(pdxTypeRegistry);
     String pdxClassName = "myPdxClassName";
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
index 91e8a73..9ce9874 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.geode.connectors.jdbc.JdbcConnectorException;
 public class TableMetaDataManagerTest {
   private static final String TABLE_NAME = "testTable";
   private static final String KEY_COLUMN = "keyColumn";
+  private static final String KEY_COLUMN2 = "keyColumn2";
 
   private TableMetaDataManager tableMetaDataManager;
   private Connection connection;
@@ -69,14 +70,26 @@ public class TableMetaDataManagerTest {
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
     TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "");
-    assertThat(data.getKeyColumnName()).isEqualTo(KEY_COLUMN);
+
+    assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList(KEY_COLUMN));
     verify(connection).getMetaData();
   }
 
   @Test
+  public void returnsCompositePrimaryKeyColumnNames() throws Exception {
+    setupCompositePrimaryKeysMetaData();
+
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "");
+
+    assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList(KEY_COLUMN, KEY_COLUMN2));
+    verify(connection).getMetaData();
+  }
+
+
+
+  @Test
   public void givenNoColumnsAndNonNullIdsThenExpectException() throws Exception {
-    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
-    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    setupTableMetaData();
     when(columnResultSet.next()).thenReturn(false);
 
     assertThatThrownBy(
@@ -87,8 +100,7 @@ public class TableMetaDataManagerTest {
 
   @Test
   public void givenOneColumnAndNonNullIdsThatDoesNotMatchThenExpectException() throws Exception {
-    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
-    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    setupTableMetaData();
     when(columnResultSet.next()).thenReturn(true).thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("existingColumn");
 
@@ -101,8 +113,7 @@ public class TableMetaDataManagerTest {
   @Test
   public void givenTwoColumnsAndNonNullIdsThatDoesNotExactlyMatchThenExpectException()
       throws Exception {
-    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
-    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    setupTableMetaData();
     when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("nonexistentid")
         .thenReturn("NONEXISTENTID");
@@ -116,8 +127,7 @@ public class TableMetaDataManagerTest {
   @Test
   public void givenThreeColumnsAndNonNullIdsThatDoesExactlyMatchThenKeyColumnNameIsReturned()
       throws Exception {
-    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
-    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    setupTableMetaData();
     when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true)
         .thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("existentid").thenReturn("EXISTENTID")
@@ -126,21 +136,37 @@ public class TableMetaDataManagerTest {
     TableMetaDataView data =
         tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "ExistentId");
 
-    assertThat(data.getKeyColumnName()).isEqualTo("ExistentId");
+    assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList("ExistentId"));
+  }
+
+  @Test
+  public void givenFourColumnsAndCompositeIdsThenOnlyKeyColumnNamesAreReturned()
+      throws Exception {
+    setupTableMetaData();
+    when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true)
+        .thenReturn(true).thenReturn(false);
+    when(columnResultSet.getString("COLUMN_NAME")).thenReturn("LeadingNonKeyColumn")
+        .thenReturn(KEY_COLUMN).thenReturn(KEY_COLUMN2)
+        .thenReturn("NonKeyColumn");
+
+    TableMetaDataView data =
+        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME,
+            KEY_COLUMN + "," + KEY_COLUMN2);
+
+    assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList(KEY_COLUMN, KEY_COLUMN2));
   }
 
   @Test
   public void givenColumnAndNonNullIdsThatDoesInexactlyMatchThenKeyColumnNameIsReturned()
       throws Exception {
-    when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
-    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
+    setupTableMetaData();
     when(columnResultSet.next()).thenReturn(true).thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("existentid");
 
     TableMetaDataView data =
         tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "ExistentId");
 
-    assertThat(data.getKeyColumnName()).isEqualTo("ExistentId");
+    assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList("ExistentId"));
   }
 
   @Test
@@ -201,17 +227,6 @@ public class TableMetaDataManagerTest {
   }
 
   @Test
-  public void throwsExceptionIfTableHasCompositePrimaryKey() throws Exception {
-    setupPrimaryKeysMetaData();
-    when(primaryKeysResultSet.next()).thenReturn(true);
-
-    assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null))
-            .isInstanceOf(JdbcConnectorException.class)
-            .hasMessage("The table " + TABLE_NAME + " has more than one primary key column.");
-  }
-
-  @Test
   public void returnsExactMatchTableNameWhenTwoTablesHasCaseInsensitiveSameName() throws Exception {
     setupPrimaryKeysMetaData();
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
@@ -360,7 +375,20 @@ public class TableMetaDataManagerTest {
 
   private void setupPrimaryKeysMetaData() throws SQLException {
     when(primaryKeysResultSet.getString("COLUMN_NAME")).thenReturn(KEY_COLUMN);
+    setupTableMetaData();
+  }
+
+  private void setupCompositePrimaryKeysMetaData() throws SQLException {
+    when(primaryKeysResultSet.getString("COLUMN_NAME")).thenReturn(KEY_COLUMN)
+        .thenReturn(KEY_COLUMN2);
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    setupTableMetaData();
+  }
+
+  private void setupTableMetaData() throws SQLException {
     when(tablesResultSet.next()).thenReturn(true).thenReturn(false);
     when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
   }
+
+
 }