You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/01/10 20:16:53 UTC

[geode] branch develop updated: GEODE-6225: add catalog and schema support to jdbc (#3063)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0b743e2  GEODE-6225: add catalog and schema support to jdbc (#3063)
0b743e2 is described below

commit 0b743e23939e7fe746678cd51302ed8f1dbebfbd
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Jan 10 12:16:41 2019 -0800

    GEODE-6225: add catalog and schema support to jdbc (#3063)
---
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |   2 +-
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |  80 +++--
 .../connectors/jdbc/JdbcWriterIntegrationTest.java | 151 +++++++--
 .../jdbc/MySqlJdbcLoaderIntegrationTest.java       |   5 +
 .../jdbc/MySqlJdbcWriterIntegrationTest.java       |   5 +
 .../jdbc/PostgresJdbcLoaderIntegrationTest.java    |  40 +++
 .../jdbc/PostgresJdbcWriterIntegrationTest.java    |  62 ++++
 .../MySqlTableMetaDataManagerIntegrationTest.java  |   7 +
 ...ostgresTableMetaDataManagerIntegrationTest.java |  35 +++
 .../TableMetaDataManagerIntegrationTest.java       |  89 +++++-
 .../jdbc/internal/TestConfigService.java           |  24 +-
 .../cli/CreateMappingCommandDUnitTest.java         |  10 +-
 .../cli/DescribeMappingCommandDUnitTest.java       |   8 +-
 .../internal/cli/ListMappingCommandDUnitTest.java  |   2 +-
 .../geode/connectors/jdbc/internal/SqlHandler.java |  10 +-
 .../jdbc/internal/SqlStatementFactory.java         |  32 +-
 .../connectors/jdbc/internal/TableMetaData.java    |  39 ++-
 .../jdbc/internal/TableMetaDataManager.java        | 151 ++++++---
 .../jdbc/internal/TableMetaDataView.java           |   7 +-
 .../jdbc/internal/cli/CreateMappingCommand.java    |  16 +-
 .../jdbc/internal/cli/DescribeMappingCommand.java  |   4 +
 .../jdbc/internal/configuration/RegionMapping.java |  62 ++--
 .../connectors/jdbc/internal/xml/ElementType.java  |   2 +
 .../xml/JdbcConnectorServiceXmlParser.java         |   2 +
 .../geode.apache.org/schema/jdbc/jdbc-1.0.xsd      |   2 +
 .../sanctioned-geode-connectors-serializables.txt  |   2 +-
 .../jdbc/internal/RegionMappingTest.java           | 156 +++++++---
 .../connectors/jdbc/internal/SqlHandlerTest.java   |   5 +-
 .../jdbc/internal/SqlStatementFactoryTest.java     |  56 ++--
 .../jdbc/internal/TableMetaDataManagerTest.java    | 339 +++++++++++++++++++--
 .../jdbc/internal/TableMetaDataTest.java           | 157 ++++++++++
 .../internal/cli/CreateMappingCommandTest.java     |  24 +-
 .../internal/cli/CreateMappingFunctionTest.java    |   2 +-
 .../internal/cli/DescribeMappingCommandTest.java   |   6 +-
 .../jdbc/internal/cli/ListMappingCommandTest.java  |   4 +-
 .../jdbc/internal/xml/ElementTypeTest.java         |   6 +
 36 files changed, 1307 insertions(+), 297 deletions(-)

diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index b7d6d38..1557890 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -338,7 +338,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   private SqlHandler createSqlHandler(String ids)
       throws RegionMappingExistsException {
     return new SqlHandler(new TableMetaDataManager(),
-        TestConfigService.getTestConfigService(getConnectionUrl(), ids),
+        TestConfigService.getTestConfigService(ids),
         testDataSourceFactory);
   }
 
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index 241e2ee..33d2243 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -47,12 +47,12 @@ import org.apache.geode.pdx.internal.AutoSerializableManager;
 public abstract class JdbcLoaderIntegrationTest {
 
   static final String DB_NAME = "test";
+  protected static final String SCHEMA_NAME = "mySchema";
+  protected static final String REGION_TABLE_NAME = "employees";
 
-  private static final String REGION_TABLE_NAME = "employees";
-
-  private InternalCache cache;
-  private Connection connection;
-  private Statement statement;
+  protected InternalCache cache;
+  protected Connection connection;
+  protected Statement statement;
 
   @Rule
   public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
@@ -90,11 +90,19 @@ public abstract class JdbcLoaderIntegrationTest {
         + " (id varchar(10) primary key not null, name varchar(10), age int)");
   }
 
+  private void createEmployeeTableWithSchema() throws Exception {
+    statement.execute("CREATE SCHEMA " + SCHEMA_NAME);
+    statement.execute("Create Table " + SCHEMA_NAME + '.' + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
   private void closeDB() throws Exception {
     if (statement == null) {
       statement = connection.createStatement();
     }
-    statement.execute("Drop table " + REGION_TABLE_NAME);
+    statement.execute("Drop table IF EXISTS " + REGION_TABLE_NAME);
+    statement.execute("Drop table IF EXISTS " + SCHEMA_NAME + "." + REGION_TABLE_NAME);
+    statement.execute("Drop schema IF EXISTS " + SCHEMA_NAME);
     statement.close();
 
     if (connection != null) {
@@ -125,7 +133,38 @@ public abstract class JdbcLoaderIntegrationTest {
         .execute("Insert into " + REGION_TABLE_NAME + "(id, name, age) values('1', 'Emp1', 21)");
     String ids = "id,name";
     Region<String, Employee> region =
-        createRegionWithJDBCLoader(REGION_TABLE_NAME, Employee.class.getName(), ids);
+        createRegionWithJDBCLoader(REGION_TABLE_NAME, Employee.class.getName(), ids, null, null);
+    createPdxType();
+
+    JSONObject key = new JSONObject();
+    key.put("id", "1");
+    key.put("name", "Emp1");
+    Employee value = region.get(key.toString());
+
+    assertThat(value.getId()).isEqualTo("1");
+    assertThat(value.getName()).isEqualTo("Emp1");
+    assertThat(value.getAge()).isEqualTo(21);
+  }
+
+  @Test
+  public void verifyGetWithSchemaAndPdxClassNameAndCompositeKey() throws Exception {
+    createEmployeeTableWithSchema();
+    statement
+        .execute("Insert into " + SCHEMA_NAME + '.' + REGION_TABLE_NAME
+            + "(id, name, age) values('1', 'Emp1', 21)");
+    String ids = "id,name";
+    String catalog;
+    String schema;
+    if (vendorSupportsSchemas()) {
+      catalog = null;
+      schema = SCHEMA_NAME;
+    } else {
+      catalog = SCHEMA_NAME;
+      schema = null;
+    }
+    Region<String, Employee> region =
+        createRegionWithJDBCLoader(REGION_TABLE_NAME, Employee.class.getName(), ids, catalog,
+            schema);
     createPdxType();
 
     JSONObject key = new JSONObject();
@@ -138,6 +177,8 @@ public abstract class JdbcLoaderIntegrationTest {
     assertThat(value.getAge()).isEqualTo(21);
   }
 
+  protected abstract boolean vendorSupportsSchemas();
+
   @Test
   public void verifyGetWithSupportedFieldsWithPdxClassName() throws Exception {
     createClassWithSupportedPdxFieldsTable(statement, REGION_TABLE_NAME);
@@ -153,11 +194,11 @@ public abstract class JdbcLoaderIntegrationTest {
     assertThat(value).isEqualTo(classWithSupportedPdxFields);
   }
 
-  private void createPdxType() throws IOException {
+  protected void createPdxType() throws IOException {
     createPdxType(new Employee("id", "name", 45));
   }
 
-  private void createPdxType(Object value) throws IOException {
+  protected void createPdxType(Object value) throws IOException {
     // the following serialization will add a pdxType
     BlobHelper.serializeToBlob(value);
   }
@@ -170,30 +211,31 @@ public abstract class JdbcLoaderIntegrationTest {
     assertThat(pdx).isNull();
   }
 
-  private SqlHandler createSqlHandler(String pdxClassName, String ids)
+  protected SqlHandler createSqlHandler(String pdxClassName, String ids, String catalog,
+      String schema)
       throws RegionMappingExistsException {
     return new SqlHandler(new TableMetaDataManager(),
-        TestConfigService.getTestConfigService((InternalCache) cache, pdxClassName,
-            getConnectionUrl(), ids),
+        TestConfigService.getTestConfigService((InternalCache) cache, pdxClassName, ids, catalog,
+            schema),
         testDataSourceFactory);
   }
 
-  private <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName,
-      String ids)
+  protected <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName,
+      String ids, String catalog, String schema)
       throws RegionMappingExistsException {
     JdbcLoader<K, V> jdbcLoader =
-        new JdbcLoader<>(createSqlHandler(pdxClassName, ids), cache);
+        new JdbcLoader<>(createSqlHandler(pdxClassName, ids, catalog, schema), cache);
     RegionFactory<K, V> regionFactory = cache.createRegionFactory(REPLICATE);
     regionFactory.setCacheLoader(jdbcLoader);
     return regionFactory.create(regionName);
   }
 
-  private <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName)
+  protected <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName)
       throws RegionMappingExistsException {
-    return createRegionWithJDBCLoader(regionName, pdxClassName, null);
+    return createRegionWithJDBCLoader(regionName, pdxClassName, null, null, null);
   }
 
-  private ClassWithSupportedPdxFields createClassWithSupportedPdxFieldsForInsert(String key) {
+  protected ClassWithSupportedPdxFields createClassWithSupportedPdxFieldsForInsert(String key) {
     ClassWithSupportedPdxFields classWithSupportedPdxFields =
         new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2, 3, 4, 5.5f, 6.0, "BigEmp",
             new Date(0), "BigEmpObject", new byte[] {1, 2}, 'c');
@@ -201,7 +243,7 @@ public abstract class JdbcLoaderIntegrationTest {
     return classWithSupportedPdxFields;
   }
 
-  private void insertIntoClassWithSupportedPdxFieldsTable(String id,
+  protected void insertIntoClassWithSupportedPdxFieldsTable(String id,
       ClassWithSupportedPdxFields classWithSupportedPdxFields) throws Exception {
     String insertString =
         "Insert into " + REGION_TABLE_NAME + " values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
index 4db143f..7432534 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
@@ -44,19 +44,22 @@ import org.apache.geode.pdx.WritablePdxInstance;
 public abstract class JdbcWriterIntegrationTest {
 
   static final String DB_NAME = "test";
-  private static final String REGION_TABLE_NAME = "employees";
-
-  private InternalCache cache;
-  private Region<String, PdxInstance> employees;
-  private Connection connection;
-  private Statement statement;
-  private JdbcWriter jdbcWriter;
-  private PdxInstance pdx1;
-  private PdxInstance pdx2;
-  private Employee employee1;
-  private Employee employee2;
-  private final TestDataSourceFactory testDataSourceFactory =
+  protected static final String SCHEMA_NAME = "mySchema";
+  protected static final String REGION_TABLE_NAME = "employees";
+
+  protected InternalCache cache;
+  protected Region<String, PdxInstance> employees;
+  protected Connection connection;
+  protected Statement statement;
+  protected JdbcWriter jdbcWriter;
+  protected PdxInstance pdx1;
+  protected PdxInstance pdx2;
+  protected Employee employee1;
+  protected Employee employee2;
+  protected final TestDataSourceFactory testDataSourceFactory =
       new TestDataSourceFactory(getConnectionUrl());
+  protected String catalog;
+  protected String schema;
 
   @Before
   public void setUp() throws Exception {
@@ -65,8 +68,6 @@ public abstract class JdbcWriterIntegrationTest {
 
     connection = getConnection();
     statement = connection.createStatement();
-    statement.execute("Create Table " + REGION_TABLE_NAME
-        + " (id varchar(10) primary key not null, name varchar(10), age int)");
     pdx1 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("id", "1")
         .writeString("name", "Emp1")
         .writeInt("age", 55).create();
@@ -75,10 +76,47 @@ public abstract class JdbcWriterIntegrationTest {
         .writeInt("age", 21).create();
     employee1 = (Employee) pdx1.getObject();
     employee2 = (Employee) pdx2.getObject();
+    createTableInUnusedSchema();
   }
 
-  private void setupRegion(String ids) throws RegionMappingExistsException {
-    employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME, ids);
+  protected void createTable() throws SQLException {
+    statement.execute("Create Table " + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  protected void createTableWithSchema() throws SQLException {
+    statement.execute("Create Schema " + SCHEMA_NAME);
+    statement.execute("Create Table " + SCHEMA_NAME + '.' + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  protected void createTableInUnusedSchema() throws SQLException {
+    Connection connection2 = getConnection();
+    statement.execute("Create Schema unusedSchema");
+    statement = connection2.createStatement();
+    statement.execute("Create Table " + "unusedSchema." + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  protected void setupRegion(String ids) throws RegionMappingExistsException {
+    sharedRegionSetup(ids, null, null);
+  }
+
+  protected void sharedRegionSetup(String ids, String catalog, String schema)
+      throws RegionMappingExistsException {
+    employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME, ids, catalog, schema);
+  }
+
+  protected void setupRegionWithSchema(String ids) throws RegionMappingExistsException {
+    if (vendorSupportsSchemas()) {
+      catalog = null;
+      schema = SCHEMA_NAME;
+    } else {
+      catalog = SCHEMA_NAME;
+      schema = null;
+
+    }
+    sharedRegionSetup(ids, catalog, schema);
   }
 
   @After
@@ -98,7 +136,11 @@ public abstract class JdbcWriterIntegrationTest {
       }
     }
     if (statement != null) {
-      statement.execute("Drop table " + REGION_TABLE_NAME);
+      statement.execute("Drop table IF EXISTS " + REGION_TABLE_NAME);
+      statement.execute("Drop table IF EXISTS unusedSchema." + REGION_TABLE_NAME);
+      statement.execute("Drop schema IF EXISTS unusedSchema");
+      statement.execute("Drop table IF EXISTS " + SCHEMA_NAME + '.' + REGION_TABLE_NAME);
+      statement.execute("Drop schema IF EXISTS " + SCHEMA_NAME);
       statement.close();
     }
     if (connection != null) {
@@ -109,6 +151,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canInsertIntoTable() throws Exception {
+    createTable();
     setupRegion(null);
     employees.put("1", pdx1);
     employees.put("2", pdx2);
@@ -120,8 +163,26 @@ public abstract class JdbcWriterIntegrationTest {
     assertThat(resultSet.next()).isFalse();
   }
 
+  protected abstract boolean vendorSupportsSchemas();
+
+  @Test
+  public void canInsertIntoTableWithSchema() throws Exception {
+    createTableWithSchema();
+    setupRegionWithSchema(null);
+    employees.put("1", pdx1);
+    employees.put("2", pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery(
+            "select * from " + SCHEMA_NAME + '.' + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
   @Test
   public void canInsertIntoTableWithCompositeKey() throws Exception {
+    createTable();
     setupRegion("id,age");
     JSONObject compositeKey1 = new JSONObject();
     compositeKey1.put("id", pdx1.getField("id"));
@@ -143,6 +204,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canPutAllInsertIntoTable() throws Exception {
+    createTable();
     setupRegion(null);
     Map<String, PdxInstance> putAllMap = new HashMap<>();
     putAllMap.put("1", pdx1);
@@ -158,6 +220,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
+    createTable();
     setupRegion(null);
     PdxInstance pdxInstanceWithId = cache.createPdxInstanceFactory(Employee.class.getName())
         .writeString("name", "Emp1").writeInt("age", 55).writeString("id", "3").create();
@@ -170,7 +233,8 @@ public abstract class JdbcWriterIntegrationTest {
   }
 
   @Test
-  public void putNonPdxInstanceFails() throws RegionMappingExistsException {
+  public void putNonPdxInstanceFails() throws Exception {
+    createTable();
     setupRegion(null);
     Region nonPdxEmployees = this.employees;
     Throwable thrown = catchThrowable(() -> nonPdxEmployees.put("1", "non pdx instance"));
@@ -180,6 +244,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void putNonPdxInstanceThatIsPdxSerializable()
       throws SQLException, RegionMappingExistsException {
+    createTable();
     setupRegion(null);
     Region nonPdxEmployees = this.employees;
     Employee value = new Employee("2", "Emp2", 22);
@@ -193,6 +258,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canDestroyFromTable() throws Exception {
+    createTable();
     setupRegion(null);
     employees.put("1", pdx1);
     employees.put("2", pdx2);
@@ -206,7 +272,24 @@ public abstract class JdbcWriterIntegrationTest {
   }
 
   @Test
+  public void canDestroyFromTableWithSchema() throws Exception {
+    createTableWithSchema();
+    setupRegionWithSchema(null);
+    employees.put("1", pdx1);
+    employees.put("2", pdx2);
+
+    employees.destroy("1");
+
+    ResultSet resultSet =
+        statement.executeQuery(
+            "select * from " + SCHEMA_NAME + '.' + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
   public void canDestroyFromTableWithCompositeKey() throws Exception {
+    createTable();
     setupRegion("id,age");
     JSONObject compositeKey1 = new JSONObject();
     compositeKey1.put("id", pdx1.getField("id"));
@@ -227,6 +310,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canUpdateTable() throws Exception {
+    createTable();
     setupRegion(null);
     employees.put("1", pdx1);
     employees.put("1", pdx2);
@@ -238,7 +322,22 @@ public abstract class JdbcWriterIntegrationTest {
   }
 
   @Test
+  public void canUpdateTableWithSchema() throws Exception {
+    createTableWithSchema();
+    setupRegionWithSchema(null);
+    employees.put("1", pdx1);
+    employees.put("1", pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery(
+            "select * from " + SCHEMA_NAME + '.' + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
   public void canUpdateTableWithCompositeKey() throws Exception {
+    createTable();
     setupRegion("id,age");
     PdxInstance myPdx = cache.createPdxInstanceFactory(Employee.class.getName())
         .writeString("id", "1").writeString("name", "Emp1")
@@ -261,6 +360,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canUpdateBecomeInsert() throws Exception {
+    createTable();
     setupRegion(null);
     employees.put("1", pdx1);
 
@@ -277,6 +377,7 @@ public abstract class JdbcWriterIntegrationTest {
 
   @Test
   public void canInsertBecomeUpdate() throws Exception {
+    createTable();
     setupRegion(null);
     statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
     validateTableRowCount(1);
@@ -289,10 +390,10 @@ public abstract class JdbcWriterIntegrationTest {
     assertThat(resultSet.next()).isFalse();
   }
 
-  private Region<String, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName,
-      String ids)
+  protected Region<String, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName,
+      String ids, String catalog, String schema)
       throws RegionMappingExistsException {
-    jdbcWriter = new JdbcWriter(createSqlHandler(ids), cache);
+    jdbcWriter = new JdbcWriter(createSqlHandler(ids, catalog, schema), cache);
 
     RegionFactory<String, PdxInstance> regionFactory =
         cache.createRegionFactory(RegionShortcut.REPLICATE);
@@ -300,21 +401,21 @@ public abstract class JdbcWriterIntegrationTest {
     return regionFactory.create(regionName);
   }
 
-  private void validateTableRowCount(int expected) throws Exception {
+  protected void validateTableRowCount(int expected) throws Exception {
     ResultSet resultSet = statement.executeQuery("select count(*) from " + REGION_TABLE_NAME);
     resultSet.next();
     int size = resultSet.getInt(1);
     assertThat(size).isEqualTo(expected);
   }
 
-  private SqlHandler createSqlHandler(String ids)
+  protected SqlHandler createSqlHandler(String ids, String catalog, String schema)
       throws RegionMappingExistsException {
     return new SqlHandler(new TableMetaDataManager(),
-        TestConfigService.getTestConfigService(getConnectionUrl(), ids),
+        TestConfigService.getTestConfigService(cache, null, ids, catalog, schema),
         testDataSourceFactory);
   }
 
-  private void assertRecordMatchesEmployee(ResultSet resultSet, String id, Employee employee)
+  protected void assertRecordMatchesEmployee(ResultSet resultSet, String id, Employee employee)
       throws SQLException {
     assertThat(resultSet.next()).isTrue();
     assertThat(resultSet.getString("id")).isEqualTo(id);
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java
index 30bc01d..14562d3 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java
@@ -52,4 +52,9 @@ public class MySqlJdbcLoaderIntegrationTest extends JdbcLoaderIntegrationTest {
         + "adate datetime, " + "anobject varchar(20), " + "abytearray blob(100), "
         + "achar char(1))");
   }
+
+  @Override
+  protected boolean vendorSupportsSchemas() {
+    return false;
+  }
 }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcWriterIntegrationTest.java
index 012ac1b..f809084 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcWriterIntegrationTest.java
@@ -41,4 +41,9 @@ public class MySqlJdbcWriterIntegrationTest extends JdbcWriterIntegrationTest {
   public String getConnectionUrl() {
     return dbRule.getConnectionUrl();
   }
+
+  @Override
+  protected boolean vendorSupportsSchemas() {
+    return false;
+  }
 }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java
index f7d47d2..aaf641a 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java
@@ -14,13 +14,18 @@
  */
 package org.apache.geode.connectors.jdbc;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.net.URL;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 
+import org.json.JSONObject;
 import org.junit.ClassRule;
+import org.junit.Test;
 
+import org.apache.geode.cache.Region;
 import org.apache.geode.test.junit.rules.DatabaseConnectionRule;
 import org.apache.geode.test.junit.rules.PostgresConnectionRule;
 
@@ -51,4 +56,39 @@ public class PostgresJdbcLoaderIntegrationTest extends JdbcLoaderIntegrationTest
         + "along bigint, " + "afloat float, " + "adouble float, " + "astring varchar(10), "
         + "adate timestamp, " + "anobject varchar(20), " + "abytearray bytea, " + "achar char(1))");
   }
+
+  @Override
+  protected boolean vendorSupportsSchemas() {
+    return true;
+  }
+
+  private void createEmployeeTableWithSchemaAndCatalog() throws Exception {
+    statement.execute("CREATE SCHEMA " + SCHEMA_NAME);
+    statement.execute("Create Table " + DB_NAME + '.' + SCHEMA_NAME + '.' + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  @Test
+  public void verifyGetWithSchemaCatalogAndPdxClassNameAndCompositeKey() throws Exception {
+    createEmployeeTableWithSchemaAndCatalog();
+    statement
+        .execute("Insert into " + DB_NAME + '.' + SCHEMA_NAME + '.' + REGION_TABLE_NAME
+            + "(id, name, age) values('1', 'Emp1', 21)");
+    String ids = "id,name";
+    Region<String, Employee> region =
+        createRegionWithJDBCLoader(REGION_TABLE_NAME, Employee.class.getName(), ids, DB_NAME,
+            SCHEMA_NAME);
+    createPdxType();
+
+    JSONObject key = new JSONObject();
+    key.put("id", "1");
+    key.put("name", "Emp1");
+    Employee value = region.get(key.toString());
+
+    assertThat(value.getId()).isEqualTo("1");
+    assertThat(value.getName()).isEqualTo("Emp1");
+    assertThat(value.getAge()).isEqualTo(21);
+  }
+
+
 }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java
index 7a5c76a..8f4a4af 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java
@@ -14,11 +14,15 @@
  */
 package org.apache.geode.connectors.jdbc;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.net.URL;
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 
 import org.junit.ClassRule;
+import org.junit.Test;
 
 import org.apache.geode.test.junit.rules.DatabaseConnectionRule;
 import org.apache.geode.test.junit.rules.PostgresConnectionRule;
@@ -41,4 +45,62 @@ public class PostgresJdbcWriterIntegrationTest extends JdbcWriterIntegrationTest
   public String getConnectionUrl() {
     return dbRule.getConnectionUrl();
   }
+
+  @Override
+  protected boolean vendorSupportsSchemas() {
+    return true;
+  }
+
+
+  protected void createTableWithCatalogAndSchema() throws SQLException {
+    statement.execute("Create Schema " + SCHEMA_NAME);
+    statement.execute("Create Table " + DB_NAME + '.' + SCHEMA_NAME + '.' + REGION_TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  @Test
+  public void canDestroyFromTableWithCatalogAndSchema() throws Exception {
+    createTableWithCatalogAndSchema();
+    sharedRegionSetup(null, DB_NAME, SCHEMA_NAME);
+    employees.put("1", pdx1);
+    employees.put("2", pdx2);
+
+    employees.destroy("1");
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + DB_NAME + '.' + SCHEMA_NAME + '.'
+            + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canInsertIntoTableWithCatalogAndSchema() throws Exception {
+    createTableWithCatalogAndSchema();
+    sharedRegionSetup(null, DB_NAME, SCHEMA_NAME);
+    employees.put("1", pdx1);
+    employees.put("2", pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + DB_NAME + '.' + SCHEMA_NAME + '.'
+            + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  @Test
+  public void canUpdateTableWithCatalogAndSchema() throws Exception {
+    createTableWithCatalogAndSchema();
+    sharedRegionSetup(null, DB_NAME, SCHEMA_NAME);
+    employees.put("1", pdx1);
+    employees.put("1", pdx2);
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + DB_NAME + '.' + SCHEMA_NAME + '.'
+            + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
+  }
+
 }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/MySqlTableMetaDataManagerIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/MySqlTableMetaDataManagerIntegrationTest.java
index 7684542..3ff0e72 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/MySqlTableMetaDataManagerIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/MySqlTableMetaDataManagerIntegrationTest.java
@@ -20,6 +20,7 @@ import java.sql.SQLException;
 
 import org.junit.ClassRule;
 
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.test.junit.rules.DatabaseConnectionRule;
 import org.apache.geode.test.junit.rules.MySqlConnectionRule;
 
@@ -36,4 +37,10 @@ public class MySqlTableMetaDataManagerIntegrationTest extends TableMetaDataManag
   public Connection getConnection() throws SQLException {
     return dbRule.getConnection();
   }
+
+  @Override
+  protected void setSchemaOrCatalogOnMapping(RegionMapping regionMapping, String name) {
+    regionMapping.setCatalog(name);
+  }
+
 }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/PostgresTableMetaDataManagerIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/PostgresTableMetaDataManagerIntegrationTest.java
index 7fca1fd..5110a2f 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/PostgresTableMetaDataManagerIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/PostgresTableMetaDataManagerIntegrationTest.java
@@ -14,12 +14,19 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.net.URL;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.junit.ClassRule;
+import org.junit.Test;
 
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.test.junit.rules.DatabaseConnectionRule;
 import org.apache.geode.test.junit.rules.PostgresConnectionRule;
 
@@ -37,4 +44,32 @@ public class PostgresTableMetaDataManagerIntegrationTest
   public Connection getConnection() throws SQLException {
     return dbRule.getConnection();
   }
+
+  @Override
+  protected void setSchemaOrCatalogOnMapping(RegionMapping regionMapping, String name) {
+    regionMapping.setSchema(name);
+  }
+
+  protected void createTableWithSchemaAndCatalog() throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    String quote = metaData.getIdentifierQuoteString();
+    statement.execute("CREATE SCHEMA MYSCHEMA");
+    statement.execute(
+        "CREATE TABLE " + DB_NAME + ".MYSCHEMA." + REGION_TABLE_NAME + " (" + quote + "id" + quote
+            + " VARCHAR(10) primary key not null," + quote + "name" + quote + " VARCHAR(10),"
+            + quote
+            + "age" + quote + " int)");
+  }
+
+  @Test
+  public void validateKeyColumnNameWithSchemaAndCatalog() throws SQLException {
+    createTableWithSchemaAndCatalog();
+    regionMapping.setSchema("MYSCHEMA");
+    regionMapping.setCatalog(DB_NAME);
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, regionMapping);
+
+    List<String> keyColumnNames = metaData.getKeyColumnNames();
+
+    assertThat(keyColumnNames).isEqualTo(Arrays.asList("id"));
+  }
 }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
index 485745e..1ea0fdb 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerIntegrationTest.java
@@ -17,6 +17,7 @@
 package org.apache.geode.connectors.jdbc.internal;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -30,21 +31,27 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
+
 
 public abstract class TableMetaDataManagerIntegrationTest {
 
-  private static final String REGION_TABLE_NAME = "employees";
+  protected static final String REGION_TABLE_NAME = "employees";
   protected static final String DB_NAME = "test";
 
-  private TableMetaDataManager manager;
+  protected TableMetaDataManager manager;
   protected Connection connection;
   protected Statement statement;
+  protected RegionMapping regionMapping;
 
   @Before
   public void setup() throws Exception {
     connection = getConnection();
     statement = connection.createStatement();
     manager = new TableMetaDataManager();
+    regionMapping = new RegionMapping();
+    regionMapping.setTableName(REGION_TABLE_NAME);
   }
 
   @After
@@ -56,7 +63,9 @@ public abstract class TableMetaDataManagerIntegrationTest {
     if (statement == null) {
       statement = connection.createStatement();
     }
-    statement.execute("Drop table " + REGION_TABLE_NAME);
+    statement.execute("Drop table IF EXISTS " + REGION_TABLE_NAME);
+    statement.execute("Drop table IF EXISTS MYSCHEMA." + REGION_TABLE_NAME);
+    statement.execute("Drop schema IF EXISTS MYSCHEMA");
     statement.close();
 
     if (connection != null) {
@@ -74,6 +83,15 @@ public abstract class TableMetaDataManagerIntegrationTest {
         + "age" + quote + " int)");
   }
 
+  protected void createTableWithSchema() throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    String quote = metaData.getIdentifierQuoteString();
+    statement.execute("CREATE SCHEMA MYSCHEMA");
+    statement.execute("CREATE TABLE MYSCHEMA." + REGION_TABLE_NAME + " (" + quote + "id" + quote
+        + " VARCHAR(10) primary key not null," + quote + "name" + quote + " VARCHAR(10)," + quote
+        + "age" + quote + " int)");
+  }
+
   protected void createTableWithNoPrimaryKey() throws SQLException {
     DatabaseMetaData metaData = connection.getMetaData();
     String quote = metaData.getIdentifierQuoteString();
@@ -82,21 +100,73 @@ public abstract class TableMetaDataManagerIntegrationTest {
         + "age" + quote + " int)");
   }
 
+  protected void createTableWithMultiplePrimaryKeys() throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    String quote = metaData.getIdentifierQuoteString();
+    statement.execute("CREATE TABLE " + REGION_TABLE_NAME + " (" + quote + "id" + quote
+        + " VARCHAR(10)," + quote + "name" + quote + " VARCHAR(10)," + quote
+        + "age" + quote + " int, PRIMARY KEY (id, name))");
+  }
+
   @Test
   public void validateKeyColumnName() throws SQLException {
     createTable();
-    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME, null);
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, regionMapping);
 
     List<String> keyColumnNames = metaData.getKeyColumnNames();
 
     assertThat(keyColumnNames).isEqualTo(Arrays.asList("id"));
   }
 
+  protected abstract void setSchemaOrCatalogOnMapping(RegionMapping regionMapping, String name);
+
+  @Test
+  public void validateKeyColumnNameWithSchema() throws SQLException {
+    createTableWithSchema();
+    setSchemaOrCatalogOnMapping(regionMapping, "MYSCHEMA");
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, regionMapping);
+
+    List<String> keyColumnNames = metaData.getKeyColumnNames();
+
+    assertThat(keyColumnNames).isEqualTo(Arrays.asList("id"));
+  }
+
+  @Test
+  public void validateUnknownSchema() throws SQLException {
+    createTable();
+    regionMapping.setSchema("unknownSchema");
+    assertThatThrownBy(
+        () -> manager.getTableMetaDataView(connection, regionMapping))
+            .isInstanceOf(JdbcConnectorException.class)
+            .hasMessageContaining("No schema was found that matches \"unknownSchema\"");
+  }
+
+  @Test
+  public void validateUnknownCatalog() throws SQLException {
+    createTable();
+    regionMapping.setCatalog("unknownCatalog");
+    assertThatThrownBy(
+        () -> manager.getTableMetaDataView(connection, regionMapping))
+            .isInstanceOf(JdbcConnectorException.class)
+            .hasMessageContaining("No catalog was found that matches \"unknownCatalog\"");
+  }
+
+  @Test
+  public void validateMultipleKeyColumnNames() throws SQLException {
+    createTableWithMultiplePrimaryKeys();
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, regionMapping);
+
+    List<String> keyColumnNames = metaData.getKeyColumnNames();
+
+    assertThat(keyColumnNames).isEqualTo(Arrays.asList("id", "name"));
+  }
+
   @Test
   public void validateKeyColumnNameOnNonPrimaryKey() throws SQLException {
     createTableWithNoPrimaryKey();
+    regionMapping.setIds("nonprimaryid");
     TableMetaDataView metaData =
-        manager.getTableMetaDataView(connection, REGION_TABLE_NAME, "nonprimaryid");
+        manager.getTableMetaDataView(connection, regionMapping);
 
     List<String> keyColumnNames = metaData.getKeyColumnNames();
 
@@ -106,8 +176,9 @@ public abstract class TableMetaDataManagerIntegrationTest {
   @Test
   public void validateKeyColumnNameOnNonPrimaryKeyWithInExactMatch() throws SQLException {
     createTableWithNoPrimaryKey();
+    regionMapping.setIds("NonPrimaryId");
     TableMetaDataView metaData =
-        manager.getTableMetaDataView(connection, REGION_TABLE_NAME, "NonPrimaryId");
+        manager.getTableMetaDataView(connection, regionMapping);
 
     List<String> keyColumnNames = metaData.getKeyColumnNames();
 
@@ -117,7 +188,7 @@ public abstract class TableMetaDataManagerIntegrationTest {
   @Test
   public void validateColumnDataTypeForName() throws SQLException {
     createTable();
-    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME, null);
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, regionMapping);
 
     int nameDataType = metaData.getColumnDataType("name");
 
@@ -127,7 +198,7 @@ public abstract class TableMetaDataManagerIntegrationTest {
   @Test
   public void validateColumnDataTypeForId() throws SQLException {
     createTable();
-    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME, null);
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, regionMapping);
 
     int nameDataType = metaData.getColumnDataType("id");
 
@@ -137,7 +208,7 @@ public abstract class TableMetaDataManagerIntegrationTest {
   @Test
   public void validateColumnDataTypeForAge() throws SQLException {
     createTable();
-    TableMetaDataView metaData = manager.getTableMetaDataView(connection, REGION_TABLE_NAME, null);
+    TableMetaDataView metaData = manager.getTableMetaDataView(connection, regionMapping);
 
     int nameDataType = metaData.getColumnDataType("age");
 
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
index cd31ce6..3caa83f 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/internal/TestConfigService.java
@@ -29,29 +29,18 @@ public class TestConfigService {
   private static final String REGION_NAME = "employees";
   private static final String CONNECTION_CONFIG_NAME = "testConnectionConfig";
 
-  public static JdbcConnectorServiceImpl getTestConfigService(String connectionUrl)
+  public static JdbcConnectorServiceImpl getTestConfigService(String ids)
       throws RegionMappingExistsException {
-    return getTestConfigService(createMockCache(), null, connectionUrl);
-  }
-
-  public static JdbcConnectorServiceImpl getTestConfigService(String connectionUrl, String ids)
-      throws RegionMappingExistsException {
-    return getTestConfigService(createMockCache(), null, connectionUrl, ids);
-  }
-
-  public static JdbcConnectorServiceImpl getTestConfigService(InternalCache cache,
-      String pdxClassName, String connectionUrl)
-      throws RegionMappingExistsException {
-    return getTestConfigService(cache, pdxClassName, connectionUrl, null);
+    return getTestConfigService(createMockCache(), null, ids, null, null);
   }
 
   public static JdbcConnectorServiceImpl getTestConfigService(InternalCache cache,
-      String pdxClassName, String connectionUrl, String ids)
+      String pdxClassName, String ids, String catalog, String schema)
       throws RegionMappingExistsException {
 
     JdbcConnectorServiceImpl service = new JdbcConnectorServiceImpl();
     service.init(cache);
-    service.createRegionMapping(createRegionMapping(pdxClassName, ids));
+    service.createRegionMapping(createRegionMapping(pdxClassName, ids, catalog, schema));
     return service;
   }
 
@@ -61,8 +50,9 @@ public class TestConfigService {
     return cache;
   }
 
-  private static RegionMapping createRegionMapping(String pdxClassName, String ids) {
+  private static RegionMapping createRegionMapping(String pdxClassName, String ids, String catalog,
+      String schema) {
     return new RegionMapping(REGION_NAME, pdxClassName, REGION_TABLE_NAME,
-        CONNECTION_CONFIG_NAME, ids);
+        CONNECTION_CONFIG_NAME, ids, catalog, schema);
   }
 }
diff --git a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
index 21b48e2..dadc964 100644
--- a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
+++ b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandDUnitTest.java
@@ -15,10 +15,12 @@
 package org.apache.geode.connectors.jdbc.internal.cli;
 
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING;
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__CATALOG_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__DATA_SOURCE_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__ID_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__PDX_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__REGION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__SCHEMA_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__SYNCHRONOUS_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__TABLE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -185,6 +187,8 @@ public class CreateMappingCommandDUnitTest {
     csb.addOption(CREATE_MAPPING__TABLE_NAME, "myTable");
     csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
     csb.addOption(CREATE_MAPPING__ID_NAME, "myId");
+    csb.addOption(CREATE_MAPPING__CATALOG_NAME, "myCatalog");
+    csb.addOption(CREATE_MAPPING__SCHEMA_NAME, "mySchema");
 
     gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
 
@@ -194,6 +198,8 @@ public class CreateMappingCommandDUnitTest {
       assertThat(mapping.getTableName()).isEqualTo("myTable");
       assertThat(mapping.getPdxName()).isEqualTo("myPdxClass");
       assertThat(mapping.getIds()).isEqualTo("myId");
+      assertThat(mapping.getCatalog()).isEqualTo("myCatalog");
+      assertThat(mapping.getSchema()).isEqualTo("mySchema");
       validateRegionAlteredOnServer(regionName, false);
       validateAsyncEventQueueCreatedOnServer(regionName, false);
     });
@@ -204,6 +210,8 @@ public class CreateMappingCommandDUnitTest {
       assertThat(regionMapping.getTableName()).isEqualTo("myTable");
       assertThat(regionMapping.getPdxName()).isEqualTo("myPdxClass");
       assertThat(regionMapping.getIds()).isEqualTo("myId");
+      assertThat(regionMapping.getCatalog()).isEqualTo("myCatalog");
+      assertThat(regionMapping.getSchema()).isEqualTo("mySchema");
       validateRegionAlteredInClusterConfig(regionName, false);
       validateAsyncEventQueueCreatedInClusterConfig(regionName, false);
     });
@@ -346,7 +354,7 @@ public class CreateMappingCommandDUnitTest {
     // NOTE: --table is optional so it should not be in the output but it is. See GEODE-3468.
     gfsh.executeAndAssertThat(csb.toString()).statusIsError()
         .containsOutput(
-            "You should specify option (--table, --pdx-name, --synchronous, --id) for this command");
+            "You should specify option (--table, --pdx-name, --synchronous, --id, --catalog, --schema) for this command");
   }
 
   @Test
diff --git a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandDUnitTest.java b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandDUnitTest.java
index 8490e42..03cc3ab 100644
--- a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandDUnitTest.java
+++ b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandDUnitTest.java
@@ -15,10 +15,12 @@
 package org.apache.geode.connectors.jdbc.internal.cli;
 
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING;
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__CATALOG_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__DATA_SOURCE_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__ID_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__PDX_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__REGION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__SCHEMA_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__TABLE_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.DescribeMappingCommand.DESCRIBE_MAPPING;
 import static org.apache.geode.connectors.jdbc.internal.cli.DescribeMappingCommand.DESCRIBE_MAPPING__REGION_NAME;
@@ -87,6 +89,8 @@ public class DescribeMappingCommandDUnitTest implements Serializable {
     csb.addOption(CREATE_MAPPING__TABLE_NAME, "testTable");
     csb.addOption(CREATE_MAPPING__PDX_NAME, "myPdxClass");
     csb.addOption(CREATE_MAPPING__ID_NAME, "myId");
+    csb.addOption(CREATE_MAPPING__CATALOG_NAME, "myCatalog");
+    csb.addOption(CREATE_MAPPING__SCHEMA_NAME, "mySchema");
 
     gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
 
@@ -102,6 +106,8 @@ public class DescribeMappingCommandDUnitTest implements Serializable {
     commandResultAssert.containsKeyValuePair(CREATE_MAPPING__TABLE_NAME, "testTable");
     commandResultAssert.containsKeyValuePair(CREATE_MAPPING__PDX_NAME, "myPdxClass");
     commandResultAssert.containsKeyValuePair(CREATE_MAPPING__ID_NAME, "myId");
+    commandResultAssert.containsKeyValuePair(CREATE_MAPPING__CATALOG_NAME, "myCatalog");
+    commandResultAssert.containsKeyValuePair(CREATE_MAPPING__SCHEMA_NAME, "mySchema");
   }
 
   @Test
@@ -149,7 +155,7 @@ public class DescribeMappingCommandDUnitTest implements Serializable {
     InternalCache cache = ClusterStartupRule.getCache();
     JdbcConnectorService service = cache.getService(JdbcConnectorService.class);
     service.createRegionMapping(new RegionMapping(REGION_NAME, "myPdxClass",
-        "testTable", "connection", "myId"));
+        "testTable", "connection", "myId", null, null));
     assertThat(service.getMappingForRegion(REGION_NAME)).isNotNull();
   }
 }
diff --git a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandDUnitTest.java b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandDUnitTest.java
index 6cfd1bc..bc4ce9e 100644
--- a/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandDUnitTest.java
+++ b/geode-connectors/src/distributedTest/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandDUnitTest.java
@@ -116,7 +116,7 @@ public class ListMappingCommandDUnitTest implements Serializable {
     for (int i = 1; i <= N; i++) {
       String name = regionName + "-" + i;
       service.createRegionMapping(
-          new RegionMapping(name, "x.y.MyPdxClass", "table", "connection", null));
+          new RegionMapping(name, "x.y.MyPdxClass", "table", "connection", null, null, null));
       assertThat(service.getMappingForRegion(name)).isNotNull();
     }
   }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index a09db75..ec1fdbf 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -79,8 +79,8 @@ public class SqlHandler {
     RegionMapping regionMapping = getMappingForRegion(region.getName());
     PdxInstance result;
     try (Connection connection = getConnection(regionMapping.getDataSourceName())) {
-      TableMetaDataView tableMetaData = this.tableMetaDataManager.getTableMetaDataView(connection,
-          regionMapping.getRegionToTableName(), regionMapping.getIds());
+      TableMetaDataView tableMetaData =
+          this.tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
       EntryColumnData entryColumnData =
           getEntryColumnData(tableMetaData, regionMapping, key, null, Operation.GET);
       try (PreparedStatement statement =
@@ -172,8 +172,8 @@ public class SqlHandler {
     RegionMapping regionMapping = getMappingForRegion(region.getName());
 
     try (Connection connection = getConnection(regionMapping.getDataSourceName())) {
-      TableMetaDataView tableMetaData = this.tableMetaDataManager.getTableMetaDataView(connection,
-          regionMapping.getRegionToTableName(), regionMapping.getIds());
+      TableMetaDataView tableMetaData =
+          this.tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
       EntryColumnData entryColumnData =
           getEntryColumnData(tableMetaData, regionMapping, key, value, operation);
       int updateCount = 0;
@@ -225,7 +225,7 @@ public class SqlHandler {
       Operation operation) {
     SqlStatementFactory statementFactory =
         new SqlStatementFactory(tableMetaData.getIdentifierQuoteString());
-    String tableName = tableMetaData.getTableName();
+    String tableName = tableMetaData.getQuotedTablePath();
     if (operation.isCreate()) {
       return statementFactory.createInsertSqlString(tableName, entryColumnData);
     } else if (operation.isUpdate()) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
index c2c697f..7d85ced 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactory.java
@@ -24,14 +24,14 @@ class SqlStatementFactory {
     this.quote = identifierQuoteString;
   }
 
-  String createSelectQueryString(String tableName, EntryColumnData entryColumnData) {
+  String createSelectQueryString(String quotedTablePath, EntryColumnData entryColumnData) {
     return addKeyColumnsToQuery(entryColumnData,
-        new StringBuilder("SELECT * FROM " + quoteIdentifier(tableName)));
+        new StringBuilder("SELECT * FROM ").append(quotedTablePath));
   }
 
-  String createDestroySqlString(String tableName, EntryColumnData entryColumnData) {
+  String createDestroySqlString(String quotedTablePath, EntryColumnData entryColumnData) {
     return addKeyColumnsToQuery(entryColumnData,
-        new StringBuilder("DELETE FROM " + quoteIdentifier(tableName)));
+        new StringBuilder("DELETE FROM ").append(quotedTablePath));
   }
 
   private String addKeyColumnsToQuery(EntryColumnData entryColumnData, StringBuilder queryBuilder) {
@@ -40,7 +40,7 @@ class SqlStatementFactory {
     while (iterator.hasNext()) {
       ColumnData keyColumn = iterator.next();
       boolean onLastColumn = !iterator.hasNext();
-      queryBuilder.append(quoteIdentifier(keyColumn.getColumnName())).append(" = ?");
+      queryBuilder.append(quote).append(keyColumn.getColumnName()).append(quote).append(" = ?");
       if (!onLastColumn) {
         queryBuilder.append(" AND ");
       }
@@ -48,24 +48,26 @@ class SqlStatementFactory {
     return queryBuilder.toString();
   }
 
-  String createUpdateSqlString(String tableName, EntryColumnData entryColumnData) {
-    StringBuilder query =
-        new StringBuilder("UPDATE ").append(quoteIdentifier(tableName)).append(" SET ");
+  String createUpdateSqlString(String quotedTablePath, EntryColumnData entryColumnData) {
+    StringBuilder query = new StringBuilder("UPDATE ")
+        .append(quotedTablePath)
+        .append(" SET ");
     int idx = 0;
     for (ColumnData column : entryColumnData.getEntryValueColumnData()) {
       idx++;
       if (idx > 1) {
         query.append(", ");
       }
-      query.append(quoteIdentifier(column.getColumnName()));
+      query.append(quote).append(column.getColumnName()).append(quote);
       query.append(" = ?");
     }
     return addKeyColumnsToQuery(entryColumnData, query);
   }
 
-  String createInsertSqlString(String tableName, EntryColumnData entryColumnData) {
-    StringBuilder columnNames =
-        new StringBuilder("INSERT INTO ").append(quoteIdentifier(tableName)).append(" (");
+  String createInsertSqlString(String quotedTablePath, EntryColumnData entryColumnData) {
+    StringBuilder columnNames = new StringBuilder("INSERT INTO ")
+        .append(quotedTablePath)
+        .append(" (");
     StringBuilder columnValues = new StringBuilder(" VALUES (");
     addColumnDataToSqlString(entryColumnData, columnNames, columnValues);
     columnNames.append(')');
@@ -86,12 +88,8 @@ class SqlStatementFactory {
       } else {
         firstTime[0] = false;
       }
-      columnNames.append(quoteIdentifier(column.getColumnName()));
+      columnNames.append(quote).append(column.getColumnName()).append(quote);
       columnValues.append('?');
     });
   }
-
-  private String quoteIdentifier(String identifier) {
-    return quote + identifier + quote;
-  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
index 904acba..71df7e1 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaData.java
@@ -16,27 +16,46 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class TableMetaData implements TableMetaDataView {
 
-  private final String tableName;
+  private final String quotedTablePath;
   private final List<String> keyColumnNames;
-  private final HashMap<String, Integer> columnNameToTypeMap;
+  private final Map<String, Integer> columnNameToTypeMap;
   private final String identifierQuoteString;
 
-  public TableMetaData(String tableName, List<String> keyColumnNames, String quoteString) {
-    this.tableName = tableName;
+  public TableMetaData(String catalogName, String schemaName, String tableName,
+      List<String> keyColumnNames, String quoteString, Map<String, Integer> dataTypes) {
+    if (quoteString == null) {
+      quoteString = "";
+    }
+    this.quotedTablePath = createQuotedTablePath(catalogName, schemaName, tableName, quoteString);
     this.keyColumnNames = keyColumnNames;
-    this.columnNameToTypeMap = new HashMap<>();
+    this.columnNameToTypeMap = dataTypes;
     this.identifierQuoteString = quoteString;
   }
 
+  private static String createQuotedTablePath(String catalogName, String schemaName,
+      String tableName, String quote) {
+    StringBuilder builder = new StringBuilder();
+    appendPrefix(builder, catalogName, quote);
+    appendPrefix(builder, schemaName, quote);
+    builder.append(quote).append(tableName).append(quote);
+    return builder.toString();
+  }
+
+  private static void appendPrefix(StringBuilder builder, String prefix, String quote) {
+    if (prefix != null && !prefix.isEmpty()) {
+      builder.append(quote).append(prefix).append(quote).append('.');
+    }
+  }
+
   @Override
-  public String getTableName() {
-    return tableName;
+  public String getQuotedTablePath() {
+    return quotedTablePath;
   }
 
   @Override
@@ -58,10 +77,6 @@ public class TableMetaData implements TableMetaDataView {
     return columnNameToTypeMap.keySet();
   }
 
-  public void addDataType(String columnName, int dataType) {
-    this.columnNameToTypeMap.put(columnName, dataType);
-  }
-
   @Override
   public String getIdentifierQuoteString() {
     return this.identifierQuoteString;
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
index f5f9b16..5b3ccf9 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManager.java
@@ -20,11 +20,14 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 
 /**
  * Given a tableName this manager will determine which column should correspond to the Geode Region
@@ -34,62 +37,117 @@ import org.apache.geode.connectors.jdbc.JdbcConnectorException;
  * remembered so that it does not need to be recomputed for the same table name.
  */
 public class TableMetaDataManager {
+  private static final String DEFAULT_CATALOG = "";
+  private static final String DEFAULT_SCHEMA = "";
   private final ConcurrentMap<String, TableMetaDataView> tableToMetaDataMap =
       new ConcurrentHashMap<>();
 
-  public TableMetaDataView getTableMetaDataView(Connection connection, String tableName,
-      String ids) {
-    return tableToMetaDataMap.computeIfAbsent(tableName,
-        k -> computeTableMetaDataView(connection, k, ids));
+  public TableMetaDataView getTableMetaDataView(Connection connection,
+      RegionMapping regionMapping) {
+    return tableToMetaDataMap.computeIfAbsent(computeTableName(regionMapping),
+        k -> computeTableMetaDataView(connection, k, regionMapping));
   }
 
-  private TableMetaDataView computeTableMetaDataView(Connection connection, String tableName,
-      String ids) {
-    TableMetaData result;
+  /**
+   * If the region mapping has been given a table name then return it.
+   * Otherwise return the region mapping's region name as the table name.
+   */
+  String computeTableName(RegionMapping regionMapping) {
+    String result = regionMapping.getTableName();
+    if (result == null) {
+      result = regionMapping.getRegionName();
+    }
+    return result;
+  }
+
+  private TableMetaDataView computeTableMetaDataView(Connection connection,
+      String tableName, RegionMapping regionMapping) {
     try {
       DatabaseMetaData metaData = connection.getMetaData();
-      try (ResultSet tables = metaData.getTables(null, null, "%", null)) {
-        String realTableName = getTableNameFromMetaData(tableName, tables);
-        List<String> keys = getPrimaryKeyColumnNamesFromMetaData(realTableName, metaData, ids);
-        String quoteString = metaData.getIdentifierQuoteString();
-        if (quoteString == null) {
-          quoteString = "";
-        }
-        result = new TableMetaData(realTableName, keys, quoteString);
-        getDataTypesFromMetaData(realTableName, metaData, result);
-      }
+      String realCatalogName = getCatalogNameFromMetaData(metaData, regionMapping);
+      String realSchemaName = getSchemaNameFromMetaData(metaData, regionMapping, realCatalogName);
+      String realTableName =
+          getTableNameFromMetaData(metaData, realCatalogName, realSchemaName, tableName);
+      List<String> keys = getPrimaryKeyColumnNamesFromMetaData(metaData, realCatalogName,
+          realSchemaName, realTableName, regionMapping.getIds());
+      String quoteString = metaData.getIdentifierQuoteString();
+      Map<String, Integer> dataTypes =
+          getDataTypesFromMetaData(metaData, realCatalogName, realSchemaName, realTableName);
+      return new TableMetaData(realCatalogName, realSchemaName, realTableName, keys, quoteString,
+          dataTypes);
     } catch (SQLException e) {
       throw JdbcConnectorException.createException(e);
     }
-    return result;
   }
 
-  private String getTableNameFromMetaData(String tableName, ResultSet tables) throws SQLException {
-    String result = null;
-    int inexactMatches = 0;
+  String getCatalogNameFromMetaData(DatabaseMetaData metaData, RegionMapping regionMapping)
+      throws SQLException {
+    String catalogFilter = regionMapping.getCatalog();
+    if (catalogFilter == null || catalogFilter.isEmpty()) {
+      return DEFAULT_CATALOG;
+    }
+    try (ResultSet catalogs = metaData.getCatalogs()) {
+      return findMatchInResultSet(catalogFilter, catalogs, "TABLE_CAT", "catalog");
+    }
+  }
 
-    while (tables.next()) {
-      String name = tables.getString("TABLE_NAME");
-      if (name.equals(tableName)) {
-        return name;
-      } else if (name.equalsIgnoreCase(tableName)) {
-        inexactMatches++;
-        result = name;
+  String getSchemaNameFromMetaData(DatabaseMetaData metaData, RegionMapping regionMapping,
+      String catalogFilter) throws SQLException {
+    String schemaFilter = regionMapping.getSchema();
+    if (schemaFilter == null || schemaFilter.isEmpty()) {
+      if ("PostgreSQL".equals(metaData.getDatabaseProductName())) {
+        schemaFilter = "public";
+      } else {
+        return DEFAULT_SCHEMA;
       }
     }
+    try (ResultSet schemas = metaData.getSchemas(catalogFilter, "%")) {
+      return findMatchInResultSet(schemaFilter, schemas, "TABLE_SCHEM", "schema");
+    }
+  }
 
-    if (inexactMatches > 1) {
-      throw new JdbcConnectorException("Duplicate tables that match region name");
+  private String getTableNameFromMetaData(DatabaseMetaData metaData, String catalogFilter,
+      String schemaFilter, String tableName) throws SQLException {
+    try (ResultSet tables = metaData.getTables(catalogFilter, schemaFilter, "%", null)) {
+      return findMatchInResultSet(tableName, tables, "TABLE_NAME", "table");
     }
+  }
 
-    if (result == null) {
-      throw new JdbcConnectorException("no table was found that matches " + tableName);
+  String findMatchInResultSet(String stringToFind, ResultSet resultSet, String column,
+      String description)
+      throws SQLException {
+    int exactMatches = 0;
+    String exactMatch = null;
+    int inexactMatches = 0;
+    String inexactMatch = null;
+    if (resultSet != null) {
+      while (resultSet.next()) {
+        String name = resultSet.getString(column);
+        if (name.equals(stringToFind)) {
+          exactMatches++;
+          exactMatch = name;
+        } else if (name.equalsIgnoreCase(stringToFind)) {
+          inexactMatches++;
+          inexactMatch = name;
+        }
+      }
     }
-    return result;
+    if (exactMatches == 1) {
+      return exactMatch;
+    }
+    if (inexactMatches > 1 || exactMatches > 1) {
+      throw new JdbcConnectorException(
+          "Multiple " + description + "s were found that match \"" + stringToFind + '"');
+    }
+    if (inexactMatches == 1) {
+      return inexactMatch;
+    }
+    throw new JdbcConnectorException(
+        "No " + description + " was found that matches \"" + stringToFind + '"');
   }
 
-  private List<String> getPrimaryKeyColumnNamesFromMetaData(String tableName,
-      DatabaseMetaData metaData,
+  private List<String> getPrimaryKeyColumnNamesFromMetaData(DatabaseMetaData metaData,
+      String catalogFilter, String schemaFilter, String tableName,
       String ids)
       throws SQLException {
     List<String> keys = new ArrayList<>();
@@ -97,10 +155,12 @@ public class TableMetaDataManager {
     if (ids != null && !ids.isEmpty()) {
       keys.addAll(Arrays.asList(ids.split(",")));
       for (String key : keys) {
-        checkColumnExistsInTable(tableName, metaData, key);
+        checkColumnExistsInTable(tableName, metaData, catalogFilter, schemaFilter, key);
       }
     } else {
-      try (ResultSet primaryKeys = metaData.getPrimaryKeys(null, null, tableName)) {
+      try (
+          ResultSet primaryKeys =
+              metaData.getPrimaryKeys(catalogFilter, schemaFilter, tableName)) {
         while (primaryKeys.next()) {
           String key = primaryKeys.getString("COLUMN_NAME");
           keys.add(key);
@@ -114,21 +174,26 @@ public class TableMetaDataManager {
     return keys;
   }
 
-  private void getDataTypesFromMetaData(String tableName, DatabaseMetaData metaData,
-      TableMetaData result) throws SQLException {
-    try (ResultSet columnData = metaData.getColumns(null, null, tableName, "%")) {
+  private Map<String, Integer> getDataTypesFromMetaData(DatabaseMetaData metaData,
+      String catalogFilter,
+      String schemaFilter, String tableName) throws SQLException {
+    Map<String, Integer> result = new HashMap<>();
+    try (ResultSet columnData =
+        metaData.getColumns(catalogFilter, schemaFilter, tableName, "%")) {
       while (columnData.next()) {
         String columnName = columnData.getString("COLUMN_NAME");
         int dataType = columnData.getInt("DATA_TYPE");
-        result.addDataType(columnName, dataType);
+        result.put(columnName, dataType);
       }
     }
+    return result;
   }
 
   private void checkColumnExistsInTable(String tableName, DatabaseMetaData metaData,
-      String columnName) throws SQLException {
+      String catalogFilter, String schemaFilter, String columnName) throws SQLException {
     int caseInsensitiveMatches = 0;
-    try (ResultSet columnData = metaData.getColumns(null, null, tableName, "%")) {
+    try (ResultSet columnData =
+        metaData.getColumns(catalogFilter, schemaFilter, tableName, "%")) {
       while (columnData.next()) {
         String realColumnName = columnData.getString("COLUMN_NAME");
         if (columnName.equals(realColumnName)) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
index 3c18dd4..e897dea 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataView.java
@@ -21,7 +21,12 @@ import java.util.List;
 import java.util.Set;
 
 public interface TableMetaDataView {
-  String getTableName();
+  /**
+   * The path will include the catalog and schema if needed.
+   *
+   * @return the fully qualified, quoted, table path
+   */
+  String getQuotedTablePath();
 
   List<String> getKeyColumnNames();
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
index 4f8a0ab..6f73e2b 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommand.java
@@ -66,6 +66,12 @@ public class CreateMappingCommand extends SingleGfshCommand {
   static final String CREATE_MAPPING__ID_NAME = "id";
   static final String CREATE_MAPPING__ID_NAME__HELP =
       "The table column names to use as the region key for this JDBC mapping. If more than one column name is given then they must be separated by commas.";
+  static final String CREATE_MAPPING__CATALOG_NAME = "catalog";
+  static final String CREATE_MAPPING__CATALOG_NAME__HELP =
+      "The catalog that contains the database table. By default, the catalog is the empty string causing the table to be referenced without a catalog prefix.";
+  static final String CREATE_MAPPING__SCHEMA_NAME = "schema";
+  static final String CREATE_MAPPING__SCHEMA_NAME__HELP =
+      "The schema that contains the database table. By default, the schema is the empty string causing the table to be referenced without a schema prefix.";
 
   public static String createAsyncEventQueueName(String regionPath) {
     if (regionPath.startsWith("/")) {
@@ -90,15 +96,19 @@ public class CreateMappingCommand extends SingleGfshCommand {
       @CliOption(key = CREATE_MAPPING__SYNCHRONOUS_NAME,
           help = CREATE_MAPPING__SYNCHRONOUS_NAME__HELP,
           specifiedDefaultValue = "true", unspecifiedDefaultValue = "false") boolean synchronous,
-      @CliOption(key = CREATE_MAPPING__ID_NAME,
-          help = CREATE_MAPPING__ID_NAME__HELP) String id) {
+      @CliOption(key = CREATE_MAPPING__ID_NAME, help = CREATE_MAPPING__ID_NAME__HELP) String id,
+      @CliOption(key = CREATE_MAPPING__CATALOG_NAME,
+          help = CREATE_MAPPING__CATALOG_NAME__HELP) String catalog,
+      @CliOption(key = CREATE_MAPPING__SCHEMA_NAME,
+          help = CREATE_MAPPING__SCHEMA_NAME__HELP) String schema) {
     if (regionName.startsWith("/")) {
       regionName = regionName.substring(1);
     }
 
     // input
     Set<DistributedMember> targetMembers = findMembersForRegion(regionName);
-    RegionMapping mapping = new RegionMapping(regionName, pdxName, table, dataSourceName, id);
+    RegionMapping mapping =
+        new RegionMapping(regionName, pdxName, table, dataSourceName, id, catalog, schema);
 
     try {
       ConfigurationPersistenceService configurationPersistenceService =
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java
index 038c31c..81cdfcb 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java
@@ -14,10 +14,12 @@
  */
 package org.apache.geode.connectors.jdbc.internal.cli;
 
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__CATALOG_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__DATA_SOURCE_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__ID_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__PDX_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__REGION_NAME;
+import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__SCHEMA_NAME;
 import static org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand.CREATE_MAPPING__TABLE_NAME;
 
 import java.util.Set;
@@ -91,6 +93,8 @@ public class DescribeMappingCommand extends GfshCommand {
     sectionModel.addData(CREATE_MAPPING__TABLE_NAME, mapping.getTableName());
     sectionModel.addData(CREATE_MAPPING__PDX_NAME, mapping.getPdxName());
     sectionModel.addData(CREATE_MAPPING__ID_NAME, mapping.getIds());
+    sectionModel.addData(CREATE_MAPPING__CATALOG_NAME, mapping.getCatalog());
+    sectionModel.addData(CREATE_MAPPING__SCHEMA_NAME, mapping.getSchema());
   }
 
   @CliAvailabilityIndicator({DESCRIBE_MAPPING})
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java
index 28bbacb..cc668df 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java
@@ -49,6 +49,7 @@ import org.apache.geode.pdx.internal.TypeRegistry;
  *       &lt;attribute name="table" type="{http://www.w3.org/2001/XMLSchema}string" />
  *       &lt;attribute name="pdx-name" type="{http://www.w3.org/2001/XMLSchema}string" />
  *       &lt;attribute name="ids" type="{http://www.w3.org/2001/XMLSchema}string" />
+ *       &lt;attribute name="catalog" type="{http://www.w3.org/2001/XMLSchema}string" />
  *     &lt;/restriction>
  *   &lt;/complexContent>
  * &lt;/complexType>
@@ -72,6 +73,10 @@ public class RegionMapping implements CacheElement {
   protected String pdxName;
   @XmlAttribute(name = "ids")
   protected String ids;
+  @XmlAttribute(name = "catalog")
+  protected String catalog;
+  @XmlAttribute(name = "schema")
+  protected String schema;
 
   @XmlTransient
   protected String regionName;
@@ -81,12 +86,14 @@ public class RegionMapping implements CacheElement {
   public RegionMapping() {}
 
   public RegionMapping(String regionName, String pdxName, String tableName,
-      String dataSourceName, String ids) {
+      String dataSourceName, String ids, String catalog, String schema) {
     this.regionName = regionName;
     this.pdxName = pdxName;
     this.tableName = tableName;
     this.dataSourceName = dataSourceName;
     this.ids = ids;
+    this.catalog = catalog;
+    this.schema = schema;
   }
 
   public void setDataSourceName(String dataSourceName) {
@@ -109,6 +116,14 @@ public class RegionMapping implements CacheElement {
     this.ids = ids;
   }
 
+  public void setCatalog(String catalog) {
+    this.catalog = catalog;
+  }
+
+  public void setSchema(String schema) {
+    this.schema = schema;
+  }
+
   public String getDataSourceName() {
     return dataSourceName;
   }
@@ -125,14 +140,15 @@ public class RegionMapping implements CacheElement {
     return ids;
   }
 
-  public String getTableName() {
-    return tableName;
+  public String getCatalog() {
+    return catalog;
   }
 
-  public String getRegionToTableName() {
-    if (tableName == null) {
-      return regionName;
-    }
+  public String getSchema() {
+    return schema;
+  }
+
+  public String getTableName() {
     return tableName;
   }
 
@@ -229,23 +245,17 @@ public class RegionMapping implements CacheElement {
 
     RegionMapping that = (RegionMapping) o;
 
-    if (regionName != null ? !regionName.equals(that.regionName) : that.regionName != null) {
-      return false;
-    }
-    if (!pdxName.equals(that.pdxName)) {
-      return false;
-    }
-    if (tableName != null ? !tableName.equals(that.tableName) : that.tableName != null) {
-      return false;
-    }
-    if (dataSourceName != null ? !dataSourceName.equals(that.dataSourceName)
-        : that.dataSourceName != null) {
-      return false;
-    }
-    if (ids != null ? !ids.equals(that.ids) : that.ids != null) {
-      return false;
-    }
-    return true;
+    return isEqual(regionName, that.regionName)
+        && isEqual(pdxName, that.pdxName)
+        && isEqual(tableName, that.tableName)
+        && isEqual(dataSourceName, that.dataSourceName)
+        && isEqual(ids, that.ids)
+        && isEqual(catalog, that.catalog)
+        && isEqual(schema, that.schema);
+  }
+
+  private static boolean isEqual(String s1, String s2) {
+    return s1 != null ? s1.equals(s2) : s2 == null;
   }
 
   @Override
@@ -255,6 +265,8 @@ public class RegionMapping implements CacheElement {
     result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
     result = 31 * result + (dataSourceName != null ? dataSourceName.hashCode() : 0);
     result = 31 * result + (ids != null ? ids.hashCode() : 0);
+    result = 31 * result + (catalog != null ? catalog.hashCode() : 0);
+    result = 31 * result + (schema != null ? schema.hashCode() : 0);
     return result;
   }
 
@@ -266,6 +278,8 @@ public class RegionMapping implements CacheElement {
         + ", tableName='" + tableName + '\''
         + ", dataSourceName='" + dataSourceName + '\''
         + ", ids='" + ids + '\''
+        + ", catalog='" + catalog + '\''
+        + ", schema='" + schema + '\''
         + '}';
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ElementType.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ElementType.java
index 46eca88..c4316ca 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ElementType.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/ElementType.java
@@ -38,6 +38,8 @@ public enum ElementType {
       mapping.setTableName(attributes.getValue(JdbcConnectorServiceXmlParser.TABLE));
       mapping.setPdxName(attributes.getValue(JdbcConnectorServiceXmlParser.PDX_NAME));
       mapping.setIds(attributes.getValue(JdbcConnectorServiceXmlParser.IDS));
+      mapping.setCatalog(attributes.getValue(JdbcConnectorServiceXmlParser.CATALOG));
+      mapping.setSchema(attributes.getValue(JdbcConnectorServiceXmlParser.SCHEMA));
       stack.push(mapping);
     }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParser.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParser.java
index abef62c..900c012 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParser.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/JdbcConnectorServiceXmlParser.java
@@ -25,6 +25,8 @@ public class JdbcConnectorServiceXmlParser extends AbstractXmlParser {
   static final String TABLE = "table";
   static final String PDX_NAME = "pdx-name";
   static final String IDS = "ids";
+  static final String CATALOG = "catalog";
+  static final String SCHEMA = "schema";
 
   @Override
   public String getNamespaceUri() {
diff --git a/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd b/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd
index b225a4d..366fb81 100644
--- a/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd
+++ b/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd
@@ -44,6 +44,8 @@ XML schema for JDBC Connector Service in Geode.
                 <xsd:attribute type="xsd:string" name="table" use="optional"/>
                 <xsd:attribute type="xsd:string" name="pdx-name" use="required"/>
                 <xsd:attribute type="xsd:string" name="ids" use="optional"/>
+                <xsd:attribute type="xsd:string" name="catalog" use="optional"/>
+                <xsd:attribute type="xsd:string" name="schema" use="optional"/>
             </xsd:complexType>
         </xsd:element>
 </xsd:schema>
diff --git a/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt b/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
index de159fb..83029fd 100755
--- a/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
+++ b/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
@@ -5,4 +5,4 @@ org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction,false
 org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingFunction,false
 org/apache/geode/connectors/jdbc/internal/cli/DestroyMappingFunction,false
 org/apache/geode/connectors/jdbc/internal/cli/ListMappingFunction,false
-org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping,false,dataSourceName:java/lang/String,ids:java/lang/String,pdxName:java/lang/String,regionName:java/lang/String,tableName:java/lang/String
+org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping,false,catalog:java/lang/String,dataSourceName:java/lang/String,ids:java/lang/String,pdxName:java/lang/String,regionName:java/lang/String,schema:java/lang/String,tableName:java/lang/String
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
index 4eee89f..cd29c4b 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
@@ -40,9 +40,6 @@ public class RegionMappingTest {
 
   private String name;
   private String fieldName1;
-  private String columnName1;
-  private String fieldName2;
-  private String columnName2;
 
   private RegionMapping mapping;
 
@@ -50,60 +47,47 @@ public class RegionMappingTest {
   public void setUp() {
     name = "name";
     fieldName1 = "myField1";
-    columnName1 = "myfield1";
-    fieldName2 = "myField2";
-    columnName2 = "MYFIELD2";
   }
 
   @Test
   public void initiatedWithNullValues() {
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
 
     assertThat(mapping.getTableName()).isNull();
     assertThat(mapping.getRegionName()).isNull();
     assertThat(mapping.getDataSourceName()).isNull();
     assertThat(mapping.getPdxName()).isEqualTo("pdxClassName");
-    assertThat(mapping.getRegionToTableName()).isNull();
     assertThat(mapping.getIds()).isNull();
+    assertThat(mapping.getCatalog()).isNull();
+    assertThat(mapping.getSchema()).isNull();
     assertThat(mapping.getColumnNameForField("fieldName", mock(TableMetaDataView.class)))
         .isEqualTo("fieldName");
   }
 
   @Test
   public void hasCorrectTableName() {
-    mapping = new RegionMapping(null, null, name, null, null);
+    mapping = new RegionMapping(null, null, name, null, null, null, null);
 
     assertThat(mapping.getTableName()).isEqualTo(name);
-    assertThat(mapping.getRegionToTableName()).isEqualTo(name);
-  }
-
-  @Test
-  public void hasCorrectTableNameWhenRegionNameIsSet() {
-    mapping = new RegionMapping("regionName", null, "tableName", null, null);
-
-    assertThat(mapping.getRegionName()).isEqualTo("regionName");
-    assertThat(mapping.getTableName()).isEqualTo("tableName");
-    assertThat(mapping.getRegionToTableName()).isEqualTo("tableName");
   }
 
   @Test
   public void hasCorrectRegionName() {
-    mapping = new RegionMapping(name, null, null, null, null);
+    mapping = new RegionMapping(name, null, null, null, null, null, null);
 
     assertThat(mapping.getRegionName()).isEqualTo(name);
-    assertThat(mapping.getRegionToTableName()).isEqualTo(name);
   }
 
   @Test
   public void hasCorrectConfigName() {
-    mapping = new RegionMapping(null, null, null, name, null);
+    mapping = new RegionMapping(null, null, null, name, null, null, null);
 
     assertThat(mapping.getDataSourceName()).isEqualTo(name);
   }
 
   @Test
   public void hasCorrectPdxClassName() {
-    mapping = new RegionMapping(null, name, null, null, null);
+    mapping = new RegionMapping(null, name, null, null, null, null, null);
 
     assertThat(mapping.getPdxName()).isEqualTo(name);
   }
@@ -111,14 +95,30 @@ public class RegionMappingTest {
   @Test
   public void hasCorrectIds() {
     String ids = "ids";
-    mapping = new RegionMapping(null, null, null, null, ids);
+    mapping = new RegionMapping(null, null, null, null, ids, null, null);
 
     assertThat(mapping.getIds()).isEqualTo(ids);
   }
 
   @Test
+  public void hasCorrectCatalog() {
+    String catalog = "catalog";
+    mapping = new RegionMapping(null, null, null, null, null, catalog, null);
+
+    assertThat(mapping.getCatalog()).isEqualTo(catalog);
+  }
+
+  @Test
+  public void hasCorrectSchema() {
+    String schema = "schema";
+    mapping = new RegionMapping(null, null, null, null, null, null, schema);
+
+    assertThat(mapping.getSchema()).isEqualTo(schema);
+  }
+
+  @Test
   public void returnsColumnNameIfFieldNotMapped() {
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
 
     String columnName = mapping.getColumnNameForField(fieldName1, mock(TableMetaDataView.class));
 
@@ -128,7 +128,7 @@ public class RegionMappingTest {
   @Test
   public void returnsColumnNameFromTableMetaDataIfFieldNotMappedAndMetaDataMatchesWithCaseDiffering() {
     String metaDataColumnName = fieldName1.toUpperCase();
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
     TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
     when(tableMetaDataView.getColumnNames()).thenReturn(Collections.singleton(metaDataColumnName));
 
@@ -139,7 +139,7 @@ public class RegionMappingTest {
   @Test
   public void returnsColumnNameFromTableMetaDataIfFieldNotMappedAndMetaDataMatchesExactly() {
     String metaDataColumnName = fieldName1;
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
     TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
     when(tableMetaDataView.getColumnNames()).thenReturn(Collections.singleton(metaDataColumnName));
 
@@ -149,7 +149,7 @@ public class RegionMappingTest {
 
   @Test
   public void returnsColumnNameIfFieldNotMappedAndNotInMetaData() {
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
     TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
     when(tableMetaDataView.getColumnNames()).thenReturn(Collections.singleton("does not match"));
 
@@ -158,7 +158,7 @@ public class RegionMappingTest {
 
   @Test
   public void getColumnNameForFieldThrowsIfTwoColumnsMatchField() {
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
 
     TableMetaDataView tableMetaDataView = mock(TableMetaDataView.class);
     HashSet<String> columnNames =
@@ -174,7 +174,7 @@ public class RegionMappingTest {
 
   @Test
   public void throwsIfColumnNotMappedAndPdxClassNameDoesNotExist() {
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
     TypeRegistry typeRegistry = mock(TypeRegistry.class);
     when(typeRegistry.getPdxTypesForClassName("pdxClassName")).thenReturn(Collections.emptySet());
     expectedException.expect(JdbcConnectorException.class);
@@ -187,7 +187,7 @@ public class RegionMappingTest {
   public void throwsIfColumnNotMappedAndPdxClassNameDoesExistButHasNoMatchingFields() {
     String pdxClassName = "pdxClassName";
     String columnName = "columnName";
-    mapping = new RegionMapping(null, pdxClassName, null, null, null);
+    mapping = new RegionMapping(null, pdxClassName, null, null, null, null, null);
     TypeRegistry typeRegistry = mock(TypeRegistry.class);
     HashSet<PdxType> pdxTypes = new HashSet<>(Arrays.asList(mock(PdxType.class)));
     when(typeRegistry.getPdxTypesForClassName(pdxClassName)).thenReturn(pdxTypes);
@@ -202,7 +202,7 @@ public class RegionMappingTest {
   public void throwsIfColumnNotMappedAndPdxClassNameDoesExistButHasMoreThanOneMatchingFields() {
     String pdxClassName = "pdxClassName";
     String columnName = "columnName";
-    mapping = new RegionMapping(null, pdxClassName, null, null, null);
+    mapping = new RegionMapping(null, pdxClassName, null, null, null, null, null);
     TypeRegistry typeRegistry = mock(TypeRegistry.class);
     PdxType pdxType = mock(PdxType.class);
     when(pdxType.getFieldNames())
@@ -220,7 +220,7 @@ public class RegionMappingTest {
   public void returnsIfColumnNotMappedAndPdxClassNameDoesExistAndHasOneFieldThatInexactlyMatches() {
     String pdxClassName = "pdxClassName";
     String columnName = "columnName";
-    mapping = new RegionMapping(null, pdxClassName, null, null, null);
+    mapping = new RegionMapping(null, pdxClassName, null, null, null, null, null);
     TypeRegistry typeRegistry = mock(TypeRegistry.class);
     PdxType pdxType = mock(PdxType.class);
     when(pdxType.getFieldNames())
@@ -236,7 +236,7 @@ public class RegionMappingTest {
   public void returnsIfColumnNotMappedAndPdxClassNameDoesExistAndHasOneFieldThatExactlyMatches() {
     String pdxClassName = "pdxClassName";
     String columnName = "columnName";
-    mapping = new RegionMapping(null, pdxClassName, null, null, null);
+    mapping = new RegionMapping(null, pdxClassName, null, null, null, null, null);
     TypeRegistry typeRegistry = mock(TypeRegistry.class);
     PdxType pdxType = mock(PdxType.class);
     when(pdxType.getPdxField(columnName)).thenReturn(mock(PdxField.class));
@@ -249,9 +249,9 @@ public class RegionMappingTest {
   @Test
   public void verifyTwoDefaultInstancesAreEqual() {
     RegionMapping rm1 =
-        new RegionMapping("regionName", "pdxClassName", null, "dataSourceName", null);
+        new RegionMapping("regionName", "pdxClassName", null, "dataSourceName", null, null, null);
     RegionMapping rm2 =
-        new RegionMapping("regionName", "pdxClassName", null, "dataSourceName", null);
+        new RegionMapping("regionName", "pdxClassName", null, "dataSourceName", null, null, null);
     assertThat(rm1).isEqualTo(rm2);
   }
 
@@ -259,17 +259,39 @@ public class RegionMappingTest {
   @Test
   public void verifyTwoInstancesThatAreEqualHaveSameHashCode() {
     RegionMapping rm1 = new RegionMapping("regionName",
-        "pdxClassName", "tableName", "dataSourceName", "ids");
+        "pdxClassName", "tableName", "dataSourceName", "ids", "catalog", "schema");
 
     RegionMapping rm2 = new RegionMapping("regionName",
-        "pdxClassName", "tableName", "dataSourceName", "ids");
+        "pdxClassName", "tableName", "dataSourceName", "ids", "catalog", "schema");
 
     assertThat(rm1.hashCode()).isEqualTo(rm2.hashCode());
   }
 
   @Test
+  public void verifyToStringGivenAllAttributes() {
+    RegionMapping rm = new RegionMapping("regionName", "pdxClassName", "tableName",
+        "dataSourceName", "ids", "catalog", "schema");
+
+    String result = rm.toString();
+
+    assertThat(result).isEqualTo(
+        "RegionMapping{regionName='regionName', pdxName='pdxClassName', tableName='tableName', dataSourceName='dataSourceName', ids='ids', catalog='catalog', schema='schema'}");
+  }
+
+  @Test
+  public void verifyToStringGivenRequiredAttributes() {
+    RegionMapping rm =
+        new RegionMapping("regionName", "pdxClassName", null, "dataSourceName", null, null, null);
+
+    String result = rm.toString();
+
+    assertThat(result).isEqualTo(
+        "RegionMapping{regionName='regionName', pdxName='pdxClassName', tableName='null', dataSourceName='dataSourceName', ids='null', catalog='null', schema='null'}");
+  }
+
+  @Test
   public void verifyThatMappingIsEqualToItself() {
-    mapping = new RegionMapping(null, "pdxClassName", null, null, null);
+    mapping = new RegionMapping(null, "pdxClassName", null, null, null, null, null);
     boolean result = mapping.equals(mapping);
     assertThat(mapping.hashCode()).isEqualTo(mapping.hashCode());
     assertThat(result).isTrue();
@@ -277,14 +299,14 @@ public class RegionMappingTest {
 
   @Test
   public void verifyThatNullIsNotEqual() {
-    mapping = new RegionMapping(null, null, null, null, null);
+    mapping = new RegionMapping(null, null, null, null, null, null, null);
     boolean result = mapping.equals(null);
     assertThat(result).isFalse();
   }
 
   @Test
   public void verifyOtherClassIsNotEqual() {
-    mapping = new RegionMapping(null, null, null, null, null);
+    mapping = new RegionMapping(null, null, null, null, null, null, null);
     boolean result = mapping.equals("not equal");
     assertThat(result).isFalse();
   }
@@ -292,9 +314,9 @@ public class RegionMappingTest {
   @Test
   public void verifyMappingWithDifferentRegionNamesAreNotEqual() {
     RegionMapping rm1 =
-        new RegionMapping(null, null, null, null, null);
+        new RegionMapping(null, null, null, null, null, null, null);
     RegionMapping rm2 =
-        new RegionMapping("name", null, null, null, null);
+        new RegionMapping("name", null, null, null, null, null, null);
     boolean result = rm1.equals(rm2);
     assertThat(result).isFalse();
   }
@@ -302,9 +324,29 @@ public class RegionMappingTest {
   @Test
   public void verifyMappingWithDifferentPdxClassNameAreNotEqual() {
     RegionMapping rm1 =
-        new RegionMapping(null, "pdxClassName", null, null, null);
+        new RegionMapping(null, "pdxClassName", null, null, null, null, null);
+    RegionMapping rm2 =
+        new RegionMapping(null, "pdxClass", null, null, null, null, null);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyMappingWithDifferentTablesAreNotEqual() {
+    RegionMapping rm1 =
+        new RegionMapping(null, "pdxClassName", "table1", null, null, null, null);
     RegionMapping rm2 =
-        new RegionMapping(null, "pdxClass", null, null, null);
+        new RegionMapping(null, "pdxClassName", "table2", null, null, null, null);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyMappingWithDifferentDataSourcesAreNotEqual() {
+    RegionMapping rm1 =
+        new RegionMapping(null, "pdxClassName", null, "datasource1", null, null, null);
+    RegionMapping rm2 =
+        new RegionMapping(null, "pdxClassName", null, "datasource2", null, null, null);
     boolean result = rm1.equals(rm2);
     assertThat(result).isFalse();
   }
@@ -312,9 +354,29 @@ public class RegionMappingTest {
   @Test
   public void verifyMappingWithDifferentIdsAreNotEqual() {
     RegionMapping rm1 =
-        new RegionMapping(null, "pdxClassName", null, null, "ids1");
+        new RegionMapping(null, "pdxClassName", null, null, "ids1", null, null);
+    RegionMapping rm2 =
+        new RegionMapping(null, "pdxClassName", null, null, "ids2", null, null);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyMappingWithDifferentCatalogsAreNotEqual() {
+    RegionMapping rm1 =
+        new RegionMapping(null, "pdxClassName", null, null, null, "catalog1", null);
+    RegionMapping rm2 =
+        new RegionMapping(null, "pdxClassName", null, null, null, "catalog2", null);
+    boolean result = rm1.equals(rm2);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void verifyMappingWithDifferentSchemasAreNotEqual() {
+    RegionMapping rm1 =
+        new RegionMapping(null, "pdxClassName", null, null, null, null, "schema1");
     RegionMapping rm2 =
-        new RegionMapping(null, "pdxClassName", null, null, "ids2");
+        new RegionMapping(null, "pdxClassName", null, null, null, null, "schema2");
     boolean result = rm1.equals(rm2);
     assertThat(result).isFalse();
   }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index ca25888..a9303ad 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -90,10 +90,10 @@ public class SqlHandlerTest {
     when(region.getRegionService()).thenReturn(cache);
     tableMetaDataManager = mock(TableMetaDataManager.class);
     tableMetaDataView = mock(TableMetaDataView.class);
-    when(tableMetaDataView.getTableName()).thenReturn(TABLE_NAME);
+    when(tableMetaDataView.getQuotedTablePath()).thenReturn(TABLE_NAME);
     when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList(KEY_COLUMN));
     final String IDS = "ids";
-    when(tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, IDS))
+    when(tableMetaDataManager.getTableMetaDataView(any(), any()))
         .thenReturn(tableMetaDataView);
     connectorService = mock(JdbcConnectorService.class);
     dataSourceFactory = mock(DataSourceFactory.class);
@@ -108,7 +108,6 @@ public class SqlHandlerTest {
     when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
     when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
     when(regionMapping.getIds()).thenReturn(IDS);
-    when(regionMapping.getRegionToTableName()).thenReturn(TABLE_NAME);
     when(connectorService.getMappingForRegion(REGION_NAME)).thenReturn(regionMapping);
 
     when(dataSource.getConnection()).thenReturn(this.connection);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
index 0a13981..22d8853 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlStatementFactoryTest.java
@@ -25,7 +25,8 @@ import org.junit.Test;
 
 public class SqlStatementFactoryTest {
 
-  private static final String TABLE_NAME = "testTable";
+  private static final String QUOTE = "@@";
+  private static final String QUOTED_TABLE_PATH = QUOTE + "testTable" + QUOTE;
   private static final String KEY_COLUMN_1_NAME = "keyColumn1";
   private static final String KEY_COLUMN_2_NAME = "keyColumn2";
   private static final String VALUE_COLUMN_1_NAME = "valueColumn1";
@@ -34,7 +35,11 @@ public class SqlStatementFactoryTest {
   private final List<ColumnData> valueColumnData = new ArrayList<>();
 
   private EntryColumnData entryColumnData;
-  private SqlStatementFactory factory = new SqlStatementFactory("");
+  private SqlStatementFactory factory = new SqlStatementFactory(QUOTE);
+
+  private String quoted(String id) {
+    return QUOTE + id + QUOTE;
+  }
 
   @Before
   public void setup() {
@@ -47,9 +52,10 @@ public class SqlStatementFactoryTest {
   @Test
   public void getSelectQueryString() throws Exception {
     String expectedStatement =
-        String.format("SELECT * FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME);
+        String.format("SELECT * FROM %s WHERE %s = ?", QUOTED_TABLE_PATH,
+            quoted(KEY_COLUMN_1_NAME));
 
-    String statement = factory.createSelectQueryString(TABLE_NAME, entryColumnData);
+    String statement = factory.createSelectQueryString(QUOTED_TABLE_PATH, entryColumnData);
 
     assertThat(statement).isEqualTo(expectedStatement);
   }
@@ -57,9 +63,9 @@ public class SqlStatementFactoryTest {
   @Test
   public void getDestroySqlString() throws Exception {
     String expectedStatement =
-        String.format("DELETE FROM %s WHERE %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME);
+        String.format("DELETE FROM %s WHERE %s = ?", QUOTED_TABLE_PATH, quoted(KEY_COLUMN_1_NAME));
 
-    String statement = factory.createDestroySqlString(TABLE_NAME, entryColumnData);
+    String statement = factory.createDestroySqlString(QUOTED_TABLE_PATH, entryColumnData);
 
     assertThat(statement).isEqualTo(expectedStatement);
   }
@@ -67,9 +73,10 @@ public class SqlStatementFactoryTest {
   @Test
   public void getUpdateSqlString() throws Exception {
     String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ?",
-        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME);
+        QUOTED_TABLE_PATH, quoted(VALUE_COLUMN_1_NAME), quoted(VALUE_COLUMN_2_NAME),
+        quoted(KEY_COLUMN_1_NAME));
 
-    String statement = factory.createUpdateSqlString(TABLE_NAME, entryColumnData);
+    String statement = factory.createUpdateSqlString(QUOTED_TABLE_PATH, entryColumnData);
 
     assertThat(statement).isEqualTo(expectedStatement);
   }
@@ -77,9 +84,10 @@ public class SqlStatementFactoryTest {
   @Test
   public void getInsertSqlString() throws Exception {
     String expectedStatement = String.format("INSERT INTO %s (%s,%s,%s) VALUES (?,?,?)",
-        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME);
+        QUOTED_TABLE_PATH, quoted(VALUE_COLUMN_1_NAME), quoted(VALUE_COLUMN_2_NAME),
+        quoted(KEY_COLUMN_1_NAME));
 
-    String statement = factory.createInsertSqlString(TABLE_NAME, entryColumnData);
+    String statement = factory.createInsertSqlString(QUOTED_TABLE_PATH, entryColumnData);
 
     assertThat(statement).isEqualTo(expectedStatement);
   }
@@ -89,9 +97,9 @@ public class SqlStatementFactoryTest {
     valueColumnData.clear();
     keyColumnData.clear();
 
-    String statement = factory.createInsertSqlString(TABLE_NAME, entryColumnData);
+    String statement = factory.createInsertSqlString(QUOTED_TABLE_PATH, entryColumnData);
 
-    String expectedStatement = String.format("INSERT INTO %s () VALUES ()", TABLE_NAME);
+    String expectedStatement = String.format("INSERT INTO %s () VALUES ()", QUOTED_TABLE_PATH);
     assertThat(statement).isEqualTo(expectedStatement);
   }
 
@@ -99,10 +107,11 @@ public class SqlStatementFactoryTest {
   public void getInsertSqlStringGivenMultipleKeys() throws Exception {
     keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
 
-    String statement = factory.createInsertSqlString(TABLE_NAME, entryColumnData);
+    String statement = factory.createInsertSqlString(QUOTED_TABLE_PATH, entryColumnData);
 
     String expectedStatement = String.format("INSERT INTO %s (%s,%s,%s,%s) VALUES (?,?,?,?)",
-        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME, KEY_COLUMN_2_NAME);
+        QUOTED_TABLE_PATH, quoted(VALUE_COLUMN_1_NAME), quoted(VALUE_COLUMN_2_NAME),
+        quoted(KEY_COLUMN_1_NAME), quoted(KEY_COLUMN_2_NAME));
     assertThat(statement).isEqualTo(expectedStatement);
   }
 
@@ -110,10 +119,11 @@ public class SqlStatementFactoryTest {
   public void getUpdateSqlStringGivenMultipleKeys() throws Exception {
     keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
 
-    String statement = factory.createUpdateSqlString(TABLE_NAME, entryColumnData);
+    String statement = factory.createUpdateSqlString(QUOTED_TABLE_PATH, entryColumnData);
 
     String expectedStatement = String.format("UPDATE %s SET %s = ?, %s = ? WHERE %s = ? AND %s = ?",
-        TABLE_NAME, VALUE_COLUMN_1_NAME, VALUE_COLUMN_2_NAME, KEY_COLUMN_1_NAME, KEY_COLUMN_2_NAME);
+        QUOTED_TABLE_PATH, quoted(VALUE_COLUMN_1_NAME), quoted(VALUE_COLUMN_2_NAME),
+        quoted(KEY_COLUMN_1_NAME), quoted(KEY_COLUMN_2_NAME));
     assertThat(statement).isEqualTo(expectedStatement);
   }
 
@@ -121,11 +131,12 @@ public class SqlStatementFactoryTest {
   public void getSelectQueryStringGivenMultipleKeys() throws Exception {
     keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
 
-    String statement = factory.createSelectQueryString(TABLE_NAME, entryColumnData);
+    String statement = factory.createSelectQueryString(QUOTED_TABLE_PATH, entryColumnData);
 
     String expectedStatement =
-        String.format("SELECT * FROM %s WHERE %s = ? AND %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME,
-            KEY_COLUMN_2_NAME);
+        String.format("SELECT * FROM %s WHERE %s = ? AND %s = ?", QUOTED_TABLE_PATH,
+            quoted(KEY_COLUMN_1_NAME),
+            quoted(KEY_COLUMN_2_NAME));
     assertThat(statement).isEqualTo(expectedStatement);
   }
 
@@ -133,11 +144,12 @@ public class SqlStatementFactoryTest {
   public void getDestroySqlStringGivenMultipleKeys() throws Exception {
     keyColumnData.add(new ColumnData(KEY_COLUMN_2_NAME, null, 0));
 
-    String statement = factory.createDestroySqlString(TABLE_NAME, entryColumnData);
+    String statement = factory.createDestroySqlString(QUOTED_TABLE_PATH, entryColumnData);
 
     String expectedStatement =
-        String.format("DELETE FROM %s WHERE %s = ? AND %s = ?", TABLE_NAME, KEY_COLUMN_1_NAME,
-            KEY_COLUMN_2_NAME);
+        String.format("DELETE FROM %s WHERE %s = ? AND %s = ?", QUOTED_TABLE_PATH,
+            quoted(KEY_COLUMN_1_NAME),
+            quoted(KEY_COLUMN_2_NAME));
     assertThat(statement).isEqualTo(expectedStatement);
   }
 
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
index 9ce9874..081c2a7 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataManagerTest.java
@@ -35,6 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 
 public class TableMetaDataManagerTest {
   private static final String TABLE_NAME = "testTable";
@@ -47,6 +48,7 @@ public class TableMetaDataManagerTest {
   ResultSet tablesResultSet;
   ResultSet primaryKeysResultSet;
   ResultSet columnResultSet;
+  RegionMapping regionMapping;
 
   @Before
   public void setup() throws Exception {
@@ -62,14 +64,17 @@ public class TableMetaDataManagerTest {
     columnResultSet = mock(ResultSet.class);
     when(databaseMetaData.getColumns(any(), any(), eq(TABLE_NAME), any()))
         .thenReturn(columnResultSet);
+    regionMapping = mock(RegionMapping.class);
+    when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
   }
 
   @Test
   public void returnsSinglePrimaryKeyColumnName() throws Exception {
     setupPrimaryKeysMetaData();
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(regionMapping.getIds()).thenReturn("");
 
-    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "");
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList(KEY_COLUMN));
     verify(connection).getMetaData();
@@ -78,22 +83,40 @@ public class TableMetaDataManagerTest {
   @Test
   public void returnsCompositePrimaryKeyColumnNames() throws Exception {
     setupCompositePrimaryKeysMetaData();
+    when(regionMapping.getIds()).thenReturn("");
 
-    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "");
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList(KEY_COLUMN, KEY_COLUMN2));
     verify(connection).getMetaData();
+    verify(databaseMetaData).getTables("", "", "%", null);
   }
 
+  @Test
+  public void verifyPostgreUsesPublicSchemaByDefault() throws Exception {
+    setupCompositePrimaryKeysMetaData();
+    when(regionMapping.getIds()).thenReturn("");
+    ResultSet schemas = mock(ResultSet.class);
+    when(schemas.next()).thenReturn(true).thenReturn(false);
+    when(schemas.getString("TABLE_SCHEM")).thenReturn("PUBLIC");
+    when(databaseMetaData.getSchemas(any(), any())).thenReturn(schemas);
+    when(databaseMetaData.getDatabaseProductName()).thenReturn("PostgreSQL");
 
+    TableMetaDataView data = tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
+
+    assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList(KEY_COLUMN, KEY_COLUMN2));
+    verify(connection).getMetaData();
+    verify(databaseMetaData).getTables("", "PUBLIC", "%", null);
+  }
 
   @Test
   public void givenNoColumnsAndNonNullIdsThenExpectException() throws Exception {
     setupTableMetaData();
     when(columnResultSet.next()).thenReturn(false);
+    when(regionMapping.getIds()).thenReturn("nonExistentId");
 
     assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "nonExistentId"))
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
             .isInstanceOf(JdbcConnectorException.class)
             .hasMessageContaining("The table testTable does not have a column named nonExistentId");
   }
@@ -103,9 +126,10 @@ public class TableMetaDataManagerTest {
     setupTableMetaData();
     when(columnResultSet.next()).thenReturn(true).thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("existingColumn");
+    when(regionMapping.getIds()).thenReturn("nonExistentId");
 
     assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "nonExistentId"))
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
             .isInstanceOf(JdbcConnectorException.class)
             .hasMessageContaining("The table testTable does not have a column named nonExistentId");
   }
@@ -117,9 +141,10 @@ public class TableMetaDataManagerTest {
     when(columnResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("nonexistentid")
         .thenReturn("NONEXISTENTID");
+    when(regionMapping.getIds()).thenReturn("nonExistentId");
 
     assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "nonExistentId"))
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
             .isInstanceOf(JdbcConnectorException.class).hasMessageContaining(
                 "The table testTable has more than one column that matches nonExistentId");
   }
@@ -132,9 +157,10 @@ public class TableMetaDataManagerTest {
         .thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("existentid").thenReturn("EXISTENTID")
         .thenReturn("ExistentId");
+    when(regionMapping.getIds()).thenReturn("ExistentId");
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "ExistentId");
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList("ExistentId"));
   }
@@ -148,10 +174,10 @@ public class TableMetaDataManagerTest {
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("LeadingNonKeyColumn")
         .thenReturn(KEY_COLUMN).thenReturn(KEY_COLUMN2)
         .thenReturn("NonKeyColumn");
+    when(regionMapping.getIds()).thenReturn(KEY_COLUMN + "," + KEY_COLUMN2);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME,
-            KEY_COLUMN + "," + KEY_COLUMN2);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList(KEY_COLUMN, KEY_COLUMN2));
   }
@@ -162,9 +188,10 @@ public class TableMetaDataManagerTest {
     setupTableMetaData();
     when(columnResultSet.next()).thenReturn(true).thenReturn(false);
     when(columnResultSet.getString("COLUMN_NAME")).thenReturn("existentid");
+    when(regionMapping.getIds()).thenReturn("ExistentId");
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, "ExistentId");
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     assertThat(data.getKeyColumnNames()).isEqualTo(Arrays.asList("ExistentId"));
   }
@@ -175,7 +202,7 @@ public class TableMetaDataManagerTest {
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     assertThat(data.getIdentifierQuoteString()).isEqualTo("");
     verify(connection).getMetaData();
@@ -189,7 +216,7 @@ public class TableMetaDataManagerTest {
     when(databaseMetaData.getIdentifierQuoteString()).thenReturn(expectedQuoteString);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     assertThat(data.getIdentifierQuoteString()).isEqualTo(expectedQuoteString);
     verify(connection).getMetaData();
@@ -200,8 +227,8 @@ public class TableMetaDataManagerTest {
     setupPrimaryKeysMetaData();
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
-    tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
-    tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+    tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
+    tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
     verify(connection).getMetaData();
   }
 
@@ -211,7 +238,7 @@ public class TableMetaDataManagerTest {
     when(connection.getMetaData()).thenThrow(cause);
 
     assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null))
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
             .isInstanceOf(JdbcConnectorException.class).hasMessageContaining("sql message");
   }
 
@@ -221,9 +248,9 @@ public class TableMetaDataManagerTest {
     when(tablesResultSet.getString("TABLE_NAME")).thenReturn("otherTable");
 
     assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null))
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
             .isInstanceOf(JdbcConnectorException.class)
-            .hasMessage("no table was found that matches testTable");
+            .hasMessage("No table was found that matches \"" + TABLE_NAME + '"');
   }
 
   @Test
@@ -235,9 +262,25 @@ public class TableMetaDataManagerTest {
         .thenReturn(TABLE_NAME);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
-    assertThat(data.getTableName()).isEqualTo(TABLE_NAME);
+    assertThat(data.getQuotedTablePath()).isEqualTo(TABLE_NAME);
+  }
+
+  @Test
+  public void returnsQuotedTableNameWhenMetaDataHasQuoteId() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase())
+        .thenReturn(TABLE_NAME);
+    String QUOTE = "@@";
+    when(this.databaseMetaData.getIdentifierQuoteString()).thenReturn(QUOTE);
+
+    TableMetaDataView data =
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
+
+    assertThat(data.getQuotedTablePath()).isEqualTo(QUOTE + TABLE_NAME + QUOTE);
   }
 
   @Test
@@ -250,9 +293,9 @@ public class TableMetaDataManagerTest {
     when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME.toUpperCase());
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
-    assertThat(data.getTableName()).isEqualTo(TABLE_NAME.toUpperCase());
+    assertThat(data.getQuotedTablePath()).isEqualTo(TABLE_NAME.toUpperCase());
   }
 
   @Test
@@ -265,9 +308,23 @@ public class TableMetaDataManagerTest {
         .thenReturn(TABLE_NAME.toUpperCase());
 
     assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null))
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
+            .isInstanceOf(JdbcConnectorException.class)
+            .hasMessage("Multiple tables were found that match \"" + TABLE_NAME + '"');
+  }
+
+  @Test
+  public void throwsExceptionWhenTwoTablesHaveExactSameName() throws Exception {
+    setupPrimaryKeysMetaData();
+    when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
+    when(tablesResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+
+    when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME).thenReturn(TABLE_NAME);
+
+    assertThatThrownBy(
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
             .isInstanceOf(JdbcConnectorException.class)
-            .hasMessage("Duplicate tables that match region name");
+            .hasMessage("Multiple tables were found that match \"" + TABLE_NAME + '"');
   }
 
   @Test
@@ -276,7 +333,7 @@ public class TableMetaDataManagerTest {
     when(primaryKeysResultSet.next()).thenReturn(false);
 
     assertThatThrownBy(
-        () -> tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null))
+        () -> tableMetaDataManager.getTableMetaDataView(connection, regionMapping))
             .isInstanceOf(JdbcConnectorException.class)
             .hasMessage("The table " + TABLE_NAME + " does not have a primary key column.");
   }
@@ -287,7 +344,7 @@ public class TableMetaDataManagerTest {
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
     int dataType = data.getColumnDataType("unknownColumn");
 
     assertThat(dataType).isEqualTo(0);
@@ -307,7 +364,7 @@ public class TableMetaDataManagerTest {
         .thenReturn(columnDataType2);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
     int dataType1 = data.getColumnDataType(columnName1);
     int dataType2 = data.getColumnDataType(columnName2);
 
@@ -332,7 +389,7 @@ public class TableMetaDataManagerTest {
     Set<String> expectedColumnNames = new HashSet<>(Arrays.asList(columnName1, columnName2));
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
     Set<String> columnNames = data.getColumnNames();
 
     assertThat(columnNames).isEqualTo(expectedColumnNames);
@@ -345,7 +402,7 @@ public class TableMetaDataManagerTest {
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     verify(primaryKeysResultSet).close();
   }
@@ -357,7 +414,7 @@ public class TableMetaDataManagerTest {
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
     verify(columnResultSet).close();
   }
@@ -368,9 +425,9 @@ public class TableMetaDataManagerTest {
     when(primaryKeysResultSet.next()).thenReturn(true).thenReturn(false);
 
     TableMetaDataView data =
-        tableMetaDataManager.getTableMetaDataView(connection, TABLE_NAME, null);
+        tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
 
-    assertThat(data.getTableName()).isEqualTo(TABLE_NAME);
+    assertThat(data.getQuotedTablePath()).isEqualTo(TABLE_NAME);
   }
 
   private void setupPrimaryKeysMetaData() throws SQLException {
@@ -390,5 +447,227 @@ public class TableMetaDataManagerTest {
     when(tablesResultSet.getString("TABLE_NAME")).thenReturn(TABLE_NAME);
   }
 
+  @Test
+  public void computeTableNameGivenRegionMappingTableNameReturnsIt() {
+    when(regionMapping.getTableName()).thenReturn("myTableName");
+
+    String result = tableMetaDataManager.computeTableName(regionMapping);
+
+    assertThat(result).isEqualTo("myTableName");
+  }
+
+  @Test
+  public void computeTableNameGivenRegionMappingRegionNameReturnsItIfTableNameIsNull() {
+    when(regionMapping.getTableName()).thenReturn(null);
+    when(regionMapping.getRegionName()).thenReturn("myRegionName");
+
+    String result = tableMetaDataManager.computeTableName(regionMapping);
+
+    assertThat(result).isEqualTo("myRegionName");
+  }
+
+  @Test
+  public void getCatalogNameFromMetaDataGivenNullCatalogReturnsEmptyString() throws SQLException {
+    when(regionMapping.getCatalog()).thenReturn(null);
+
+    String result = tableMetaDataManager.getCatalogNameFromMetaData(null, regionMapping);
+
+    assertThat(result).isEqualTo("");
+  }
+
+  @Test
+  public void getCatalogNameFromMetaDataGivenEmptyCatalogReturnsEmptyString() throws SQLException {
+    when(regionMapping.getCatalog()).thenReturn("");
+
+    String result = tableMetaDataManager.getCatalogNameFromMetaData(null, regionMapping);
+
+    assertThat(result).isEqualTo("");
+  }
+
+  @Test
+  public void getCatalogNameFromMetaDataGivenCatalogReturnIt() throws SQLException {
+    String myCatalog = "myCatalog";
+    when(regionMapping.getCatalog()).thenReturn(myCatalog);
+    ResultSet catalogsResultSet = mock(ResultSet.class);
+    when(catalogsResultSet.next()).thenReturn(true).thenReturn(false);
+    when(catalogsResultSet.getString("TABLE_CAT")).thenReturn(myCatalog);
+    when(databaseMetaData.getCatalogs()).thenReturn(catalogsResultSet);
+
+    String result =
+        tableMetaDataManager.getCatalogNameFromMetaData(databaseMetaData, regionMapping);
+
+    assertThat(result).isEqualTo(myCatalog);
+  }
+
+  @Test
+  public void getSchemaNameFromMetaDataGivenNullSchemaReturnsEmptyString() throws SQLException {
+    when(regionMapping.getSchema()).thenReturn(null);
+
+    String result =
+        tableMetaDataManager.getSchemaNameFromMetaData(databaseMetaData, regionMapping, null);
+
+    assertThat(result).isEqualTo("");
+  }
+
+  @Test
+  public void getSchemaNameFromMetaDataGivenEmptySchemaReturnsEmptyString() throws SQLException {
+    when(regionMapping.getSchema()).thenReturn("");
+
+    String result =
+        tableMetaDataManager.getSchemaNameFromMetaData(databaseMetaData, regionMapping, null);
+
+    assertThat(result).isEqualTo("");
+  }
+
+  @Test
+  public void getSchemaNameFromMetaDataGivenSchemaReturnsIt() throws SQLException {
+    String mySchema = "mySchema";
+    when(regionMapping.getSchema()).thenReturn(mySchema);
+    ResultSet schemasResultSet = mock(ResultSet.class);
+    when(schemasResultSet.next()).thenReturn(true).thenReturn(false);
+    when(schemasResultSet.getString("TABLE_SCHEM")).thenReturn(mySchema);
+    String catalogFilter = "myCatalogFilter";
+    when(databaseMetaData.getSchemas(catalogFilter, "%")).thenReturn(schemasResultSet);
+
+    String result = tableMetaDataManager.getSchemaNameFromMetaData(databaseMetaData, regionMapping,
+        catalogFilter);
+
+    assertThat(result).isEqualTo(mySchema);
+  }
+
+  @Test
+  public void getSchemaNameFromMetaDataGivenNullSchemaOnPostgresReturnsPublic()
+      throws SQLException {
+    String defaultPostgresSchema = "public";
+    when(regionMapping.getSchema()).thenReturn(null);
+    ResultSet schemasResultSet = mock(ResultSet.class);
+    when(schemasResultSet.next()).thenReturn(true).thenReturn(false);
+    when(schemasResultSet.getString("TABLE_SCHEM")).thenReturn(defaultPostgresSchema);
+    String catalogFilter = "myCatalogFilter";
+    when(databaseMetaData.getSchemas(catalogFilter, "%")).thenReturn(schemasResultSet);
+    when(databaseMetaData.getDatabaseProductName()).thenReturn("PostgreSQL");
+
+    String result = tableMetaDataManager.getSchemaNameFromMetaData(databaseMetaData, regionMapping,
+        catalogFilter);
+
+    assertThat(result).isEqualTo(defaultPostgresSchema);
+  }
+
+  @Test
+  public void findMatchInResultSetGivenEmptyResultSetThrows() throws SQLException {
+    String stringToFind = "stringToFind";
+    ResultSet resultSet = mock(ResultSet.class);
+    String column = "column";
+    String description = "description";
+
+    assertThatThrownBy(
+        () -> tableMetaDataManager.findMatchInResultSet(stringToFind, resultSet, column,
+            description))
+                .isInstanceOf(JdbcConnectorException.class)
+                .hasMessageContaining(
+                    "No " + description + " was found that matches \"" + stringToFind + '"');
+  }
+
+  @Test
+  public void findMatchInResultSetGivenNullResultSetThrows() throws SQLException {
+    String stringToFind = "stringToFind";
+    ResultSet resultSet = null;
+    String column = "column";
+    String description = "description";
+
+    assertThatThrownBy(
+        () -> tableMetaDataManager.findMatchInResultSet(stringToFind, resultSet, column,
+            description))
+                .isInstanceOf(JdbcConnectorException.class)
+                .hasMessageContaining(
+                    "No " + description + " was found that matches \"" + stringToFind + '"');
+  }
+
+  @Test
+  public void findMatchInResultSetGivenResultSetWithNoMatchThrows() throws SQLException {
+    String stringToFind = "stringToFind";
+    String column = "column";
+    String description = "description";
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(column)).thenReturn("doesNotMatch");
+
+    assertThatThrownBy(
+        () -> tableMetaDataManager.findMatchInResultSet(stringToFind, resultSet, column,
+            description))
+                .isInstanceOf(JdbcConnectorException.class)
+                .hasMessageContaining(
+                    "No " + description + " was found that matches \"" + stringToFind + '"');
+  }
+
+  @Test
+  public void findMatchInResultSetGivenResultSetWithMultipleExactMatchesThrows()
+      throws SQLException {
+    String stringToFind = "stringToFind";
+    String column = "column";
+    String description = "description";
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(column)).thenReturn("stringToFind");
+
+    assertThatThrownBy(
+        () -> tableMetaDataManager.findMatchInResultSet(stringToFind, resultSet, column,
+            description))
+                .isInstanceOf(JdbcConnectorException.class)
+                .hasMessageContaining(
+                    "Multiple " + description + "s were found that match \"" + stringToFind + '"');
+  }
+
+  @Test
+  public void findMatchInResultSetGivenResultSetWithMultipleInexactMatchesThrows()
+      throws SQLException {
+    String stringToFind = "stringToFind";
+    String column = "column";
+    String description = "description";
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+    when(resultSet.getString(column)).thenReturn("STRINGToFind");
+
+    assertThatThrownBy(
+        () -> tableMetaDataManager.findMatchInResultSet(stringToFind, resultSet, column,
+            description))
+                .isInstanceOf(JdbcConnectorException.class)
+                .hasMessageContaining(
+                    "Multiple " + description + "s were found that match \"" + stringToFind + '"');
+  }
+
+  @Test
+  public void findMatchInResultSetGivenResultSetWithOneInexactMatchReturnsIt() throws SQLException {
+    String stringToFind = "stringToFind";
+    String column = "column";
+    String description = "description";
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(false);
+    String inexactMatch = "STRINGToFind";
+    when(resultSet.getString(column)).thenReturn(inexactMatch);
+
+    String result =
+        tableMetaDataManager.findMatchInResultSet(stringToFind, resultSet, column, description);
+
+    assertThat(result).isEqualTo(inexactMatch);
+  }
+
+  @Test
+  public void findMatchInResultSetGivenResultSetWithOneExactMatchAndMultipleInexactReturnsTheExactMatch()
+      throws SQLException {
+    String stringToFind = "stringToFind";
+    String column = "column";
+    String description = "description";
+    ResultSet resultSet = mock(ResultSet.class);
+    when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+    String inexactMatch = "STRINGToFind";
+    when(resultSet.getString(column)).thenReturn(inexactMatch).thenReturn(stringToFind)
+        .thenReturn(inexactMatch);
+
+    String result =
+        tableMetaDataManager.findMatchInResultSet(stringToFind, resultSet, column, description);
+
+    assertThat(result).isEqualTo(stringToFind);
+  }
 
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataTest.java
new file mode 100644
index 0000000..8028b11
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/TableMetaDataTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class TableMetaDataTest {
+  private String catalogName;
+  private String schemaName;
+  private String tableName;
+  private List<String> keyColumnNames;
+  private String quoteString;
+  private Map<String, Integer> dataTypes;
+
+  private TableMetaData tableMetaData;
+
+  private void createTableMetaData() {
+    tableMetaData = new TableMetaData(catalogName, schemaName, tableName, keyColumnNames,
+        quoteString, dataTypes);
+  }
+
+  @Test
+  public void verifyGetIdentifierQuoteString() {
+    quoteString = "MyQuote";
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getIdentifierQuoteString()).isEqualTo(quoteString);
+  }
+
+  @Test
+  public void verifyKeyColumnNames() {
+    keyColumnNames = Arrays.asList("c1", "c2");
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getKeyColumnNames()).isEqualTo(keyColumnNames);
+  }
+
+  @Test
+  public void verifyColumnNames() {
+    Map<String, Integer> map = new HashMap<>();
+    map.put("k1", 1);
+    map.put("k2", 2);
+    dataTypes = map;
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getColumnNames()).isEqualTo(dataTypes.keySet());
+  }
+
+  @Test
+  public void verifyColumnDataType() {
+    Map<String, Integer> map = new HashMap<>();
+    map.put("k1", 1);
+    map.put("k2", 2);
+    dataTypes = map;
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getColumnDataType("k1")).isEqualTo(1);
+    assertThat(tableMetaData.getColumnDataType("k2")).isEqualTo(2);
+    assertThat(tableMetaData.getColumnDataType("k3")).isEqualTo(0);
+  }
+
+  @Test
+  public void verifyTableWithQuoteAndNoCatalogOrSchema() {
+    quoteString = "+";
+    tableName = "myTable";
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getQuotedTablePath()).isEqualTo(quoteString + tableName + quoteString);
+  }
+
+  @Test
+  public void verifyTableWithQuoteAndEmptyCatalogAndSchema() {
+    quoteString = "+";
+    tableName = "myTable";
+    catalogName = "";
+    schemaName = "";
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getQuotedTablePath()).isEqualTo(quoteString + tableName + quoteString);
+  }
+
+  @Test
+  public void verifyTableWithQuoteAndSchemaAndNoCatalog() {
+    quoteString = "+";
+    tableName = "myTable";
+    schemaName = "mySchema";
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getQuotedTablePath()).isEqualTo(
+        quoteString + schemaName + quoteString + "." + quoteString + tableName + quoteString);
+  }
+
+  @Test
+  public void verifyTableWithQuoteAndCatalogAndNoSchema() {
+    quoteString = "+";
+    tableName = "myTable";
+    catalogName = "myCatalog";
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getQuotedTablePath()).isEqualTo(
+        quoteString + catalogName + quoteString + "." + quoteString + tableName + quoteString);
+  }
+
+  @Test
+  public void verifyTableWithQuoteAndSchemaCatalog() {
+    quoteString = "+";
+    tableName = "myTable";
+    schemaName = "mySchema";
+    catalogName = "myCatalog";
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getQuotedTablePath())
+        .isEqualTo(quoteString + catalogName + quoteString + "." + quoteString + schemaName
+            + quoteString + "." + quoteString + tableName + quoteString);
+  }
+
+  @Test
+  public void verifyTableWithSchemaCatalogAndNoQuote() {
+    tableName = "myTable";
+    schemaName = "mySchema";
+    catalogName = "myCatalog";
+
+    createTableMetaData();
+
+    assertThat(tableMetaData.getQuotedTablePath())
+        .isEqualTo(catalogName + "." + schemaName + "." + tableName);
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
index 6981d81..2d66fbf 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingCommandTest.java
@@ -117,9 +117,11 @@ public class CreateMappingCommandTest {
     setupRequiredPreconditions();
     results.add(successFunctionResult);
     String ids = "ids";
+    String catalog = "catalog";
+    String schema = "schema";
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, false, ids);
+        tableName, pdxClass, false, ids, catalog, schema);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.OK);
     Object[] results = (Object[]) result.getConfigObject();
@@ -131,6 +133,8 @@ public class CreateMappingCommandTest {
     assertThat(regionMapping.getTableName()).isEqualTo(tableName);
     assertThat(regionMapping.getPdxName()).isEqualTo(pdxClass);
     assertThat(regionMapping.getIds()).isEqualTo(ids);
+    assertThat(regionMapping.getCatalog()).isEqualTo(catalog);
+    assertThat(regionMapping.getSchema()).isEqualTo(schema);
     assertThat(synchronous).isFalse();
   }
 
@@ -140,7 +144,7 @@ public class CreateMappingCommandTest {
     results.add(successFunctionResult);
 
     ResultModel result = createRegionMappingCommand.createMapping("/" + regionName, dataSourceName,
-        tableName, pdxClass, false, null);
+        tableName, pdxClass, false, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.OK);
     Object[] results = (Object[]) result.getConfigObject();
@@ -155,7 +159,7 @@ public class CreateMappingCommandTest {
     results.clear();
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, false, null);
+        tableName, pdxClass, false, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
   }
@@ -166,7 +170,7 @@ public class CreateMappingCommandTest {
     doReturn(null).when(createRegionMappingCommand).getConfigurationPersistenceService();
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, false, null);
+        tableName, pdxClass, false, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
     assertThat(result.toString()).contains("Cluster Configuration must be enabled.");
@@ -183,7 +187,7 @@ public class CreateMappingCommandTest {
     when(cacheConfig.getRegions()).thenReturn(Collections.emptyList());
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, false, null);
+        tableName, pdxClass, false, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
     assertThat(result.toString())
@@ -212,7 +216,7 @@ public class CreateMappingCommandTest {
     when(matchingRegion.getCustomRegionElements()).thenReturn(customList);
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, false, null);
+        tableName, pdxClass, false, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
     assertThat(result.toString()).contains("A JDBC mapping for " + regionName + " already exists.");
@@ -236,7 +240,7 @@ public class CreateMappingCommandTest {
     when(matchingRegion.getRegionAttributes()).thenReturn(loaderAttribute);
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, false, null);
+        tableName, pdxClass, false, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
     assertThat(result.toString()).contains("The existing region " + regionName
@@ -261,7 +265,7 @@ public class CreateMappingCommandTest {
     when(matchingRegion.getRegionAttributes()).thenReturn(writerAttribute);
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, true, null);
+        tableName, pdxClass, true, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
     assertThat(result.toString()).contains("The existing region " + regionName
@@ -290,7 +294,7 @@ public class CreateMappingCommandTest {
     when(cacheConfig.getAsyncEventQueues()).thenReturn(asyncEventQueues);
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, true, null);
+        tableName, pdxClass, true, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.OK);
   }
@@ -318,7 +322,7 @@ public class CreateMappingCommandTest {
     when(cacheConfig.getAsyncEventQueues()).thenReturn(asyncEventQueues);
 
     ResultModel result = createRegionMappingCommand.createMapping(regionName, dataSourceName,
-        tableName, pdxClass, false, null);
+        tableName, pdxClass, false, null, null, null);
 
     assertThat(result.getStatus()).isSameAs(Result.Status.ERROR);
     assertThat(result.toString())
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
index f28a2fa..e14ca8d 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunctionTest.java
@@ -76,7 +76,7 @@ public class CreateMappingFunctionTest {
     distributedMember = mock(DistributedMember.class);
     service = mock(JdbcConnectorService.class);
 
-    regionMapping = new RegionMapping(REGION_NAME, null, null, null, null);
+    regionMapping = new RegionMapping(REGION_NAME, null, null, null, null, null, null);
 
     when(context.getResultSender()).thenReturn(resultSender);
     when(context.getCache()).thenReturn(cache);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java
index 674b1ae..c4bd45c 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java
@@ -69,7 +69,7 @@ public class DescribeMappingCommandTest {
         null);
 
     RegionMapping mapping =
-        new RegionMapping("region", "class1", "table1", "name1", "myId");
+        new RegionMapping("region", "class1", "table1", "name1", "myId", "myCatalog", "mySchema");
 
     ResultCollector rc = mock(ResultCollector.class);
     doReturn(rc).when(command).executeFunction(any(), any(), any(Set.class));
@@ -81,7 +81,9 @@ public class DescribeMappingCommandTest {
         .containsOutput("data-source", "name1")
         .containsOutput("table", "table1")
         .containsOutput("pdx-name", "class1")
-        .containsOutput("id", "myId");
+        .containsOutput("id", "myId")
+        .containsOutput("catalog", "myCatalog")
+        .containsOutput("schema", "mySchema");
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandTest.java
index 6a2c719..190ccc9 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/ListMappingCommandTest.java
@@ -62,9 +62,9 @@ public class ListMappingCommandTest {
         null);
 
     RegionMapping mapping1 =
-        new RegionMapping("region1", "class1", "table1", "name1", null);
+        new RegionMapping("region1", "class1", "table1", "name1", null, null, null);
     RegionMapping mapping2 =
-        new RegionMapping("region2", "class2", "table2", "name2", null);
+        new RegionMapping("region2", "class2", "table2", "name2", null, null, null);
 
     ResultCollector rc = mock(ResultCollector.class);
     doReturn(rc).when(command).executeFunction(any(), any(), any(Set.class));
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
index 9b15900..a7c8bf0 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/xml/ElementTypeTest.java
@@ -16,9 +16,11 @@ package org.apache.geode.connectors.jdbc.internal.xml;
 
 
 import static org.apache.geode.connectors.jdbc.internal.xml.ElementType.JDBC_MAPPING;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.CATALOG;
 import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.DATA_SOURCE;
 import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.IDS;
 import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.PDX_NAME;
+import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.SCHEMA;
 import static org.apache.geode.connectors.jdbc.internal.xml.JdbcConnectorServiceXmlParser.TABLE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -75,6 +77,8 @@ public class ElementTypeTest {
     when(attributes.getValue(TABLE)).thenReturn("table");
     when(attributes.getValue(PDX_NAME)).thenReturn("pdxClass");
     when(attributes.getValue(IDS)).thenReturn("ids");
+    when(attributes.getValue(CATALOG)).thenReturn("catalog");
+    when(attributes.getValue(SCHEMA)).thenReturn("schema");
     when(regionCreation.getFullPath()).thenReturn("/region");
     stack.push(regionCreation);
 
@@ -86,6 +90,8 @@ public class ElementTypeTest {
     assertThat(regionMapping.getTableName()).isEqualTo("table");
     assertThat(regionMapping.getPdxName()).isEqualTo("pdxClass");
     assertThat(regionMapping.getIds()).isEqualTo("ids");
+    assertThat(regionMapping.getCatalog()).isEqualTo("catalog");
+    assertThat(regionMapping.getSchema()).isEqualTo("schema");
   }
 
   @Test