You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2023/05/17 22:45:50 UTC

[iceberg] branch master updated: Improve Error Handling to map Snowflake Exceptions into Iceberg Exceptions (#6952)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 202af0335b Improve Error Handling to map Snowflake Exceptions into Iceberg Exceptions (#6952)
202af0335b is described below

commit 202af0335b654aa89373a0c14a62c34d1341bd07
Author: AnubhavSiddharth <an...@gmail.com>
AuthorDate: Wed May 17 15:45:43 2023 -0700

    Improve Error Handling to map Snowflake Exceptions into Iceberg Exceptions (#6952)
    
    * Improve Error Handling to map Snowflake Exceptions into Iceberg Exceptions
    
    * Fix code style
    
    * Stray character
    
    * Address code review comments
    
    * Style cop
    
    * Refactor and cleanup helper method
    
    * Refactor helper method
    
    * Change Set declaration
    
    * Address review feedback to refactor tests to be clearer
    
    * Refactor Exception conversion method
    
    ---------
    
    Co-authored-by: Anubhav Sudhakar <an...@snowflake.com>
---
 .../iceberg/snowflake/JdbcSnowflakeClient.java     |  57 +++-
 .../apache/iceberg/snowflake/SnowflakeCatalog.java |   2 +-
 .../iceberg/snowflake/JdbcSnowflakeClientTest.java | 363 +++++++++++++++++++--
 3 files changed, 387 insertions(+), 35 deletions(-)

diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
index ffa4107006..d283fa9064 100644
--- a/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java
@@ -23,11 +23,15 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.jdbc.JdbcClientPool;
 import org.apache.iceberg.jdbc.UncheckedInterruptedException;
 import org.apache.iceberg.jdbc.UncheckedSQLException;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 /**
@@ -37,6 +41,15 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 class JdbcSnowflakeClient implements SnowflakeClient {
   static final String EXPECTED_JDBC_IMPL = "net.snowflake.client.jdbc.SnowflakeDriver";
 
+  @VisibleForTesting
+  static final Set<Integer> DATABASE_NOT_FOUND_ERROR_CODES = ImmutableSet.of(2001, 2003, 2043);
+
+  @VisibleForTesting
+  static final Set<Integer> SCHEMA_NOT_FOUND_ERROR_CODES = ImmutableSet.of(2001, 2003, 2043);
+
+  @VisibleForTesting
+  static final Set<Integer> TABLE_NOT_FOUND_ERROR_CODES = ImmutableSet.of(2001, 2003, 2043);
+
   @FunctionalInterface
   interface ResultSetParser<T> {
     T parse(ResultSet rs) throws SQLException;
@@ -153,7 +166,7 @@ class JdbcSnowflakeClient implements SnowflakeClient {
                   queryHarness.query(
                       conn, finalQuery, SCHEMA_RESULT_SET_HANDLER, database.databaseName()));
     } catch (SQLException e) {
-      if (e.getErrorCode() == 2003 && e.getMessage().contains("does not exist")) {
+      if (DATABASE_NOT_FOUND_ERROR_CODES.contains(e.getErrorCode())) {
         return false;
       }
       throw new UncheckedSQLException(e, "Failed to check if database '%s' exists", database);
@@ -186,7 +199,7 @@ class JdbcSnowflakeClient implements SnowflakeClient {
                   queryHarness.query(
                       conn, finalQuery, TABLE_RESULT_SET_HANDLER, schema.toIdentifierString()));
     } catch (SQLException e) {
-      if (e.getErrorCode() == 2003 && e.getMessage().contains("does not exist")) {
+      if (SCHEMA_NOT_FOUND_ERROR_CODES.contains(e.getErrorCode())) {
         return false;
       }
       throw new UncheckedSQLException(e, "Failed to check if schema '%s' exists", schema);
@@ -208,7 +221,8 @@ class JdbcSnowflakeClient implements SnowflakeClient {
                   queryHarness.query(
                       conn, "SHOW DATABASES IN ACCOUNT", DATABASE_RESULT_SET_HANDLER));
     } catch (SQLException e) {
-      throw new UncheckedSQLException(e, "Failed to list databases");
+      throw snowflakeExceptionToIcebergException(
+          SnowflakeIdentifier.ofRoot(), e, "Failed to list databases");
     } catch (InterruptedException e) {
       throw new UncheckedInterruptedException(e, "Interrupted while listing databases");
     }
@@ -250,7 +264,8 @@ class JdbcSnowflakeClient implements SnowflakeClient {
                   queryHarness.query(
                       conn, finalQuery, SCHEMA_RESULT_SET_HANDLER, finalQueryParams));
     } catch (SQLException e) {
-      throw new UncheckedSQLException(e, "Failed to list schemas for scope '%s'", scope);
+      throw snowflakeExceptionToIcebergException(
+          scope, e, String.format("Failed to list schemas for scope '%s'", scope));
     } catch (InterruptedException e) {
       throw new UncheckedInterruptedException(
           e, "Interrupted while listing schemas for scope '%s'", scope);
@@ -298,7 +313,8 @@ class JdbcSnowflakeClient implements SnowflakeClient {
               conn ->
                   queryHarness.query(conn, finalQuery, TABLE_RESULT_SET_HANDLER, finalQueryParams));
     } catch (SQLException e) {
-      throw new UncheckedSQLException(e, "Failed to list tables for scope '%s'", scope);
+      throw snowflakeExceptionToIcebergException(
+          scope, e, String.format("Failed to list tables for scope '%s'", scope));
     } catch (InterruptedException e) {
       throw new UncheckedInterruptedException(
           e, "Interrupted while listing tables for scope '%s'", scope);
@@ -331,7 +347,10 @@ class JdbcSnowflakeClient implements SnowflakeClient {
                       TABLE_METADATA_RESULT_SET_HANDLER,
                       tableIdentifier.toIdentifierString()));
     } catch (SQLException e) {
-      throw new UncheckedSQLException(e, "Failed to get table metadata for '%s'", tableIdentifier);
+      throw snowflakeExceptionToIcebergException(
+          tableIdentifier,
+          e,
+          String.format("Failed to get table metadata for '%s'", tableIdentifier));
     } catch (InterruptedException e) {
       throw new UncheckedInterruptedException(
           e, "Interrupted while getting table metadata for '%s'", tableIdentifier);
@@ -343,4 +362,30 @@ class JdbcSnowflakeClient implements SnowflakeClient {
   public void close() {
     connectionPool.close();
   }
+
+  private RuntimeException snowflakeExceptionToIcebergException(
+      SnowflakeIdentifier identifier, SQLException ex, String defaultExceptionMessage) {
+    // NoSuchNamespace exception for Database and Schema cases
+    if ((identifier.type() == SnowflakeIdentifier.Type.DATABASE
+            && DATABASE_NOT_FOUND_ERROR_CODES.contains(ex.getErrorCode()))
+        || (identifier.type() == SnowflakeIdentifier.Type.SCHEMA
+            && SCHEMA_NOT_FOUND_ERROR_CODES.contains(ex.getErrorCode()))) {
+      return new NoSuchNamespaceException(
+          ex,
+          "Identifier not found: '%s'. Underlying exception: '%s'",
+          identifier,
+          ex.getMessage());
+    }
+    // NoSuchTable exception for Table cases
+    else if (identifier.type() == SnowflakeIdentifier.Type.TABLE
+        && TABLE_NOT_FOUND_ERROR_CODES.contains(ex.getErrorCode())) {
+      return new NoSuchTableException(
+          ex,
+          "Identifier not found: '%s'. Underlying exception: '%s'",
+          identifier,
+          ex.getMessage());
+    }
+    // Unchecked SQL Exception in all other cases as fall back
+    return new UncheckedSQLException(ex, "Exception Message: %s", defaultExceptionMessage);
+  }
 }
diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
index 28b80991be..dd20c8ded9 100644
--- a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
@@ -209,7 +209,7 @@ public class SnowflakeCatalog extends BaseMetastoreCatalog
       default:
         throw new IllegalArgumentException(
             String.format(
-                "loadNamespaceMetadat must be at either DATABASE or SCHEMA level; got %s from namespace %s",
+                "loadNamespaceMetadata must be at either DATABASE or SCHEMA level; got %s from namespace %s",
                 id, namespace));
     }
     if (namespaceExists) {
diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
index 4f9b21261c..4fd0ae0e21 100644
--- a/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
+++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.snowflake;
 
+import static org.apache.iceberg.snowflake.JdbcSnowflakeClient.DATABASE_NOT_FOUND_ERROR_CODES;
+import static org.apache.iceberg.snowflake.JdbcSnowflakeClient.SCHEMA_NOT_FOUND_ERROR_CODES;
+import static org.apache.iceberg.snowflake.JdbcSnowflakeClient.TABLE_NOT_FOUND_ERROR_CODES;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -29,6 +32,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.jdbc.JdbcClientPool;
 import org.apache.iceberg.jdbc.UncheckedInterruptedException;
 import org.apache.iceberg.jdbc.UncheckedSQLException;
@@ -96,12 +101,49 @@ public class JdbcSnowflakeClientTest {
   @Test
   public void testDatabaseDoesntExist() throws SQLException {
     when(mockResultSet.next())
-        .thenThrow(new SQLException("Database does not exist", "2000", 2003, null));
+        .thenThrow(new SQLException("Database does not exist", "2000", 2003, null))
+        .thenThrow(
+            new SQLException(
+                "Database does not exist, or operation cannot be performed", "2000", 2043, null))
+        .thenThrow(
+            new SQLException("Database does not exist or not authorized", "2000", 2001, null));
+
+    // Error code 2003
+    Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .isFalse();
+
+    // Error code 2043
+    Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .isFalse();
 
+    // Error code 2001
     Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
         .isFalse();
   }
 
+  @Test
+  public void testDatabaseFailureWithOtherException() throws SQLException {
+    Exception injectedException = new SQLException("Some other exception", "2000", 2, null);
+    when(mockResultSet.next()).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(() -> snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .withMessageContaining("Failed to check if database 'DATABASE: 'DB_1'' exists")
+        .withCause(injectedException);
+  }
+
+  @Test
+  public void testDatabaseFailureWithInterruptedException()
+      throws SQLException, InterruptedException {
+    Exception injectedException = new InterruptedException("Fake interrupted exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+        .isThrownBy(() -> snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1")))
+        .withMessageContaining("Interrupted while checking if database 'DATABASE: 'DB_1'' exists")
+        .withCause(injectedException);
+  }
+
   @Test
   public void testSchemaExists() throws SQLException {
     when(mockResultSet.next())
@@ -134,20 +176,70 @@ public class JdbcSnowflakeClientTest {
   @Test
   public void testSchemaDoesntExistNoSchemaFoundException() throws SQLException {
     when(mockResultSet.next())
-        .thenThrow(new SQLException("Schema does not exist", "2000", 2003, null));
+        // The Database exists check should pass, followed by Error code 2003 for Schema exists
+        .thenReturn(true)
+        .thenReturn(false)
+        .thenThrow(new SQLException("Schema does not exist", "2000", 2003, null))
+        // The Database exists check should pass, followed by Error code 2043 for Schema exists
+        .thenReturn(true)
+        .thenReturn(false)
+        .thenThrow(
+            new SQLException(
+                "Schema does not exist, or operation cannot be performed", "2000", 2043, null))
+        // The Database exists check should pass, followed by Error code 2001 for Schema exists
+        .thenReturn(true)
+        .thenReturn(false)
+        .thenThrow(new SQLException("Schema does not exist or not authorized", "2000", 2001, null));
+
+    when(mockResultSet.getString("name")).thenReturn("DB1").thenReturn("SCHEMA1");
+    when(mockResultSet.getString("database_name")).thenReturn("DB1");
+
+    // Error code 2003
+    Assertions.assertThat(
+            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2")))
+        .isFalse();
+
+    // Error code 2043
+    Assertions.assertThat(
+            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2")))
+        .isFalse();
 
+    // Error code 2001
     Assertions.assertThat(
-            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1")))
+            snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2")))
         .isFalse();
   }
 
   @Test
   public void testSchemaFailureWithOtherException() throws SQLException {
-    when(mockResultSet.next()).thenThrow(new SQLException("Some other exception", "2000", 2, null));
+    Exception injectedException = new SQLException("Some other exception", "2000", 2, null);
+    when(mockResultSet.next())
+        // The Database exists check should pass, followed by Error code 2 for Schema exists
+        .thenReturn(true)
+        .thenReturn(false)
+        .thenThrow(injectedException);
+
+    when(mockResultSet.getString("name")).thenReturn("DB1").thenReturn("SCHEMA1");
+    when(mockResultSet.getString("database_name")).thenReturn("DB1");
 
     Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
         .isThrownBy(
-            () -> snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1")));
+            () -> snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2")))
+        .withMessageContaining("Failed to check if schema 'SCHEMA: 'DB_1.SCHEMA_2'' exists")
+        .withCause(injectedException);
+  }
+
+  @Test
+  public void testSchemaFailureWithInterruptedException()
+      throws SQLException, InterruptedException {
+    Exception injectedException = new InterruptedException("Fake Interrupted exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+        .isThrownBy(
+            () -> snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2")))
+        .withMessageContaining("Interrupted while checking if database 'DATABASE: 'DB_1'' exists")
+        .withCause(injectedException);
   }
 
   @Test
@@ -170,6 +262,53 @@ public class JdbcSnowflakeClientTest {
             SnowflakeIdentifier.ofDatabase("DB_3"));
   }
 
+  /**
+   * Any unexpected SQLException from the underlying connection will propagate out as a
+   * UncheckedSQLException when listing databases at Root level.
+   */
+  @Test
+  public void testListDatabasesSQLExceptionAtRootLevel() throws SQLException, InterruptedException {
+    Exception injectedException =
+        new SQLException(String.format("SQL exception with Error Code %d", 0), "2000", 0, null);
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(() -> snowflakeClient.listDatabases())
+        .withMessageContaining("Failed to list databases")
+        .withCause(injectedException);
+  }
+
+  /**
+   * Any unexpected SQLException from the underlying connection will propagate out as an
+   * UncheckedSQLException when listing databases if there is no error code.
+   */
+  @Test
+  public void testListDatabasesSQLExceptionWithoutErrorCode()
+      throws SQLException, InterruptedException {
+    Exception injectedException = new SQLException("Fake SQL exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(() -> snowflakeClient.listDatabases())
+        .withMessageContaining("Failed to list databases")
+        .withCause(injectedException);
+  }
+
+  /**
+   * Any unexpected InterruptedException from the underlying connection will propagate out as an
+   * UncheckedInterruptedException when listing databases.
+   */
+  @Test
+  public void testListDatabasesInterruptedException() throws SQLException, InterruptedException {
+    Exception injectedException = new InterruptedException("Fake interrupted exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
+        .isThrownBy(() -> snowflakeClient.listDatabases())
+        .withMessageContaining("Interrupted while listing databases")
+        .withCause(injectedException);
+  }
+
   /**
    * For the root scope, expect an underlying query to list schemas at the ACCOUNT level with no
    * query parameters.
@@ -231,15 +370,69 @@ public class JdbcSnowflakeClientTest {
 
   /**
    * Any unexpected SQLException from the underlying connection will propagate out as an
-   * UncheckedSQLException when listing schemas.
+   * UncheckedSQLException when listing schemas at Root level.
    */
   @Test
-  public void testListSchemasSQLException() throws SQLException, InterruptedException {
-    when(mockClientPool.run(any(ClientPool.Action.class)))
-        .thenThrow(new SQLException("Fake SQL exception"));
+  public void testListSchemasSQLExceptionAtRootLevel() throws SQLException, InterruptedException {
+    Exception injectedException =
+        new SQLException(String.format("SQL exception with Error Code %d", 0), "2000", 0, null);
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(() -> snowflakeClient.listSchemas(SnowflakeIdentifier.ofRoot()))
+        .withMessageContaining("Failed to list schemas for scope 'ROOT: '''")
+        .withCause(injectedException);
+  }
+
+  /**
+   * Any unexpected SQLException with specific error codes from the underlying connection will
+   * propagate out as a NoSuchNamespaceException when listing schemas at Database level.
+   */
+  @Test
+  public void testListSchemasSQLExceptionAtDatabaseLevel()
+      throws SQLException, InterruptedException {
+    for (Integer errorCode : DATABASE_NOT_FOUND_ERROR_CODES) {
+      Exception injectedException =
+          new SQLException(
+              String.format("SQL exception with Error Code %d", errorCode),
+              "2000",
+              errorCode,
+              null);
+      when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+      Assertions.assertThatExceptionOfType(NoSuchNamespaceException.class)
+          .isThrownBy(() -> snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")))
+          .withMessageContaining(
+              String.format(
+                  "Identifier not found: 'DATABASE: 'DB_1''. Underlying exception: 'SQL exception with Error Code %d'",
+                  errorCode))
+          .withCause(injectedException);
+    }
+  }
+
+  /** List schemas is not supported at Schema level */
+  @Test
+  public void testListSchemasAtSchemaLevel() {
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(
+            () -> snowflakeClient.listSchemas(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2")))
+        .withMessageContaining("Unsupported scope type for listSchemas: SCHEMA: 'DB_1.SCHEMA_2'");
+  }
+
+  /**
+   * Any unexpected SQLException from the underlying connection will propagate out as an
+   * UncheckedSQLException when listing schemas if there is no error code.
+   */
+  @Test
+  public void testListSchemasSQLExceptionWithoutErrorCode()
+      throws SQLException, InterruptedException {
+    Exception injectedException = new SQLException("Fake SQL exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
     Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
         .isThrownBy(() -> snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")))
-        .withStackTraceContaining("Fake SQL exception");
+        .withMessageContaining("Failed to list schemas for scope 'DATABASE: 'DB_1''")
+        .withCause(injectedException);
   }
 
   /**
@@ -248,11 +441,13 @@ public class JdbcSnowflakeClientTest {
    */
   @Test
   public void testListSchemasInterruptedException() throws SQLException, InterruptedException {
-    when(mockClientPool.run(any(ClientPool.Action.class)))
-        .thenThrow(new InterruptedException("Fake interrupted exception"));
+    Exception injectedException = new InterruptedException("Fake interrupted exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
     Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
         .isThrownBy(() -> snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")))
-        .withStackTraceContaining("Fake interrupted exception");
+        .withMessageContaining("Interrupted while listing schemas for scope 'DATABASE: 'DB_1''")
+        .withCause(injectedException);
   }
 
   /**
@@ -367,15 +562,91 @@ public class JdbcSnowflakeClientTest {
 
   /**
    * Any unexpected SQLException from the underlying connection will propagate out as an
-   * UncheckedSQLException when listing tables.
+   * UncheckedSQLException when listing tables at Root level
    */
   @Test
-  public void testListIcebergTablesSQLException() throws SQLException, InterruptedException {
-    when(mockClientPool.run(any(ClientPool.Action.class)))
-        .thenThrow(new SQLException("Fake SQL exception"));
+  public void testListIcebergTablesSQLExceptionAtRootLevel()
+      throws SQLException, InterruptedException {
+    Exception injectedException =
+        new SQLException(String.format("SQL exception with Error Code %d", 0), "2000", 0, null);
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+    Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
+        .isThrownBy(() -> snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofRoot()))
+        .withMessageContaining("Failed to list tables for scope 'ROOT: '''")
+        .withCause(injectedException);
+  }
+
+  /**
+   * Any unexpected SQLException with specific error codes from the underlying connection will
+   * propagate out as a NoSuchNamespaceException when listing tables at Database level
+   */
+  @Test
+  public void testListIcebergTablesSQLExceptionAtDatabaseLevel()
+      throws SQLException, InterruptedException {
+    for (Integer errorCode : DATABASE_NOT_FOUND_ERROR_CODES) {
+      Exception injectedException =
+          new SQLException(
+              String.format("SQL exception with Error Code %d", errorCode),
+              "2000",
+              errorCode,
+              null);
+      when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+      Assertions.assertThatExceptionOfType(NoSuchNamespaceException.class)
+          .isThrownBy(
+              () -> snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")))
+          .withMessageContaining(
+              String.format(
+                  "Identifier not found: 'DATABASE: 'DB_1''. Underlying exception: 'SQL exception with Error Code %d'",
+                  errorCode))
+          .withCause(injectedException);
+    }
+  }
+
+  /**
+   * Any unexpected SQLException with specific error codes from the underlying connection will
+   * propagate out as a NoSuchNamespaceException when listing tables at Schema level
+   */
+  @Test
+  public void testListIcebergTablesSQLExceptionAtSchemaLevel()
+      throws SQLException, InterruptedException {
+    for (Integer errorCode : SCHEMA_NOT_FOUND_ERROR_CODES) {
+      Exception injectedException =
+          new SQLException(
+              String.format("SQL exception with Error Code %d", errorCode),
+              "2000",
+              errorCode,
+              null);
+      when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+      Assertions.assertThatExceptionOfType(NoSuchNamespaceException.class)
+          .isThrownBy(
+              () ->
+                  snowflakeClient.listIcebergTables(
+                      SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1")))
+          .withMessageContaining(
+              String.format(
+                  "Identifier not found: 'SCHEMA: 'DB_1.SCHEMA_1''. Underlying exception: 'SQL exception with Error Code %d'",
+                  errorCode))
+          .withCause(injectedException);
+    }
+  }
+
+  /**
+   * Any unexpected SQLException without error code from the underlying connection will propagate
+   * out as an UncheckedSQLException when listing tables.
+   */
+  @Test
+  public void testListIcebergTablesSQLExceptionWithoutErrorCode()
+      throws SQLException, InterruptedException {
+    Exception injectedException = new SQLException("Fake SQL exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
     Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
         .isThrownBy(() -> snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")))
-        .withStackTraceContaining("Fake SQL exception");
+        .withMessageContaining("Failed to list tables for scope 'DATABASE: 'DB_1''")
+        .withCause(injectedException);
   }
 
   /**
@@ -385,11 +656,13 @@ public class JdbcSnowflakeClientTest {
   @Test
   public void testListIcebergTablesInterruptedException()
       throws SQLException, InterruptedException {
-    when(mockClientPool.run(any(ClientPool.Action.class)))
-        .thenThrow(new InterruptedException("Fake interrupted exception"));
+    Exception injectedException = new InterruptedException("Fake interrupted exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
     Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
         .isThrownBy(() -> snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")))
-        .withStackTraceContaining("Fake interrupted exception");
+        .withMessageContaining("Interrupted while listing tables for scope 'DATABASE: 'DB_1''")
+        .withCause(injectedException);
   }
 
   /**
@@ -498,20 +771,51 @@ public class JdbcSnowflakeClientTest {
         .withMessageContaining("{\"malformed_no_closing_bracket");
   }
 
+  /**
+   * Any unexpected SQLException with specific error codes from the underlying connection will
+   * propagate out as a NoSuchTableException when getting table metadata.
+   */
+  @Test
+  public void testGetTableMetadataSQLException() throws SQLException, InterruptedException {
+    for (Integer errorCode : TABLE_NOT_FOUND_ERROR_CODES) {
+      Exception injectedException =
+          new SQLException(
+              String.format("SQL exception with Error Code %d", errorCode),
+              "2000",
+              errorCode,
+              null);
+      when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
+      Assertions.assertThatExceptionOfType(NoSuchTableException.class)
+          .isThrownBy(
+              () ->
+                  snowflakeClient.loadTableMetadata(
+                      SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1")))
+          .withMessageContaining(
+              String.format(
+                  "Identifier not found: 'TABLE: 'DB_1.SCHEMA_1.TABLE_1''. Underlying exception: 'SQL exception with Error Code %d'",
+                  errorCode))
+          .withCause(injectedException);
+    }
+  }
+
   /**
    * Any unexpected SQLException from the underlying connection will propagate out as an
    * UncheckedSQLException when getting table metadata.
    */
   @Test
-  public void testGetTableMetadataSQLException() throws SQLException, InterruptedException {
-    when(mockClientPool.run(any(ClientPool.Action.class)))
-        .thenThrow(new SQLException("Fake SQL exception"));
+  public void testGetTableMetadataSQLExceptionWithoutErrorCode()
+      throws SQLException, InterruptedException {
+    Exception injectedException = new SQLException("Fake SQL exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
     Assertions.assertThatExceptionOfType(UncheckedSQLException.class)
         .isThrownBy(
             () ->
                 snowflakeClient.loadTableMetadata(
                     SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1")))
-        .withStackTraceContaining("Fake SQL exception");
+        .withMessageContaining("Failed to get table metadata for 'TABLE: 'DB_1.SCHEMA_1.TABLE_1''")
+        .withCause(injectedException);
   }
 
   /**
@@ -520,14 +824,17 @@ public class JdbcSnowflakeClientTest {
    */
   @Test
   public void testGetTableMetadataInterruptedException() throws SQLException, InterruptedException {
-    when(mockClientPool.run(any(ClientPool.Action.class)))
-        .thenThrow(new InterruptedException("Fake interrupted exception"));
+    Exception injectedException = new InterruptedException("Fake interrupted exception");
+    when(mockClientPool.run(any(ClientPool.Action.class))).thenThrow(injectedException);
+
     Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class)
         .isThrownBy(
             () ->
                 snowflakeClient.loadTableMetadata(
                     SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1")))
-        .withStackTraceContaining("Fake interrupted exception");
+        .withMessageContaining(
+            "Interrupted while getting table metadata for 'TABLE: 'DB_1.SCHEMA_1.TABLE_1''")
+        .withCause(injectedException);
   }
 
   /** Calling close() propagates to closing underlying client pool. */