You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/12/09 22:37:41 UTC

[gobblin] branch master updated: [GOBBLIN-1584] Add replace record logic for Mysql writer (#3438)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 65a3649  [GOBBLIN-1584] Add replace record logic for Mysql writer (#3438)
65a3649 is described below

commit 65a3649474be1d8b5203c8cacb16b8e1bcd5455b
Author: umustafi <um...@gmail.com>
AuthorDate: Thu Dec 9 14:37:12 2021 -0800

    [GOBBLIN-1584] Add replace record logic for Mysql writer (#3438)
    
    * [GOBBLIN-1584] Add replace record logic for Mysql writer
    
    * Allows user to configure a mysql ingestion job to allow replacement of the record's value for an existing record in the table.
    * Also replaces a backward iteration of ResultSet() with forward iteration in MySqlWriterCommands
    
    * remove extra line
    
    * refactor to improve usage of overwrite data field, adjust tests accordingly
    
    * improve documentation and throwing errors
    
    * modify exception for TeradataWriterCommands
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
 .../gobblin/configuration/ConfigurationKeys.java   |  5 ++
 .../writer/commands/BaseJdbcBufferedInserter.java  |  3 +-
 .../writer/commands/JdbcWriterCommandsFactory.java |  9 ++--
 .../writer/commands/MySqlBufferedInserter.java     | 24 +++++++++-
 .../writer/commands/MySqlWriterCommands.java       | 22 +++++----
 .../writer/commands/PostgresWriterCommands.java    | 14 ++++--
 .../writer/commands/TeradataWriterCommands.java    | 18 ++++---
 .../gobblin/writer/JdbcWriterCommandsTest.java     |  2 +-
 .../gobblin/writer/MySqlBufferedInserterTest.java  | 56 +++++++++++++++-------
 .../gobblin/writer/PostgresWriterCommandsTest.java |  2 +-
 10 files changed, 111 insertions(+), 44 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c838e91..0ec72c3 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -217,6 +217,11 @@ public class ConfigurationKeys {
   public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1);
 
   /**
+   * Optional property to specify whether existing data in databases can be overwritten during ingestion jobs
+   */
+  public static final String ALLOW_JDBC_RECORD_OVERWRITE = "allow.jdbc.record.overwrite";
+
+  /**
    * Optional property to specify a default Authenticator class for a job
    */
   public static final String DEFAULT_AUTHENTICATOR_CLASS = "job.default.authenticator.class";
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/BaseJdbcBufferedInserter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/BaseJdbcBufferedInserter.java
index 82fb4ad..ac71ff8 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/BaseJdbcBufferedInserter.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/BaseJdbcBufferedInserter.java
@@ -34,10 +34,11 @@ import com.github.rholder.retry.WaitStrategies;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
+import lombok.ToString;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.converter.jdbc.JdbcEntryData;
 import org.apache.gobblin.converter.jdbc.JdbcEntryDatum;
-import lombok.ToString;
 
 
 /**
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
index c650249..fdc24ba 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
@@ -38,13 +38,16 @@ public class JdbcWriterCommandsFactory {
    * @return Provides JdbcWriterCommands bases on destination.
    */
   public JdbcWriterCommands newInstance(Destination destination, Connection conn) {
+
+    boolean overwriteRecords = destination.getProperties().getPropAsBoolean(ConfigurationKeys.ALLOW_JDBC_RECORD_OVERWRITE);
+
     switch (destination.getType()) {
       case MYSQL:
-        return new MySqlWriterCommands(destination.getProperties(), conn);
+        return new MySqlWriterCommands(destination.getProperties(), conn, overwriteRecords);
       case TERADATA:
-        return new TeradataWriterCommands(destination.getProperties(), conn);
+        return new TeradataWriterCommands(destination.getProperties(), conn, overwriteRecords);
       case POSTGRES:
-        return new PostgresWriterCommands(destination.getProperties(), conn);
+        return new PostgresWriterCommands(destination.getProperties(), conn, overwriteRecords);
       default:
         throw new IllegalArgumentException(destination.getType() + " is not supported");
     }
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlBufferedInserter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlBufferedInserter.java
index e44ba60..48d597b 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlBufferedInserter.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlBufferedInserter.java
@@ -33,18 +33,30 @@ import lombok.ToString;
 /**
  * The implementation of JdbcBufferedInserter for MySQL.
  * This purpose of buffered insert is mainly for performance reason and the implementation is based on the
- * reference manual http://dev.mysql.com/doc/refman/5.0/en/insert-speed.html
+ * reference manual https://dev.mysql.com/doc/refman/8.0/en/
+ *
+ * This class supports two types of insertions for MySQL 1) standard insertion - only supports records with unique
+ * primary keys and fails on attempted insertion of a duplicate record 2) replace insertion - inserts new records as
+ * normal but allows for value overwrites for duplicate inserts (by primary key)
+ *
+ * Note that replacement occurs at 'record-level', so if there are duplicates in the same input then they will replace
+ * each other in a non-deterministic order.
  */
 @ToString
 public class MySqlBufferedInserter extends BaseJdbcBufferedInserter {
 
   private static final Logger LOG = LoggerFactory.getLogger(MySqlBufferedInserter.class);
 
+  protected static final String REPLACE_STATEMENT_PREFIX_FORMAT = "REPLACE INTO %s.%s (%s) VALUES ";
+
   private final int maxParamSize;
 
-  public MySqlBufferedInserter(State state, Connection conn) {
+  private final boolean overwriteRecords;
+
+  public MySqlBufferedInserter(State state, Connection conn, boolean overwriteRecords) {
     super(state, conn);
     this.maxParamSize = state.getPropAsInt(WRITER_JDBC_MAX_PARAM_SIZE, DEFAULT_WRITER_JDBC_MAX_PARAM_SIZE);
+    this.overwriteRecords = overwriteRecords;
   }
 
   @Override
@@ -86,4 +98,12 @@ public class MySqlBufferedInserter extends BaseJdbcBufferedInserter {
     this.batchSize = actualBatchSize;
     super.initializeBatch(databaseName, table);
   }
+
+  @Override
+  /**
+   * Use separate insertion statement if data overwrites are allowed
+   */
+  protected String createInsertStatementStr(String databaseName, String table) {
+    return String.format(this.overwriteRecords ? REPLACE_STATEMENT_PREFIX_FORMAT : INSERT_STATEMENT_PREFIX_FORMAT, databaseName, table, JOINER_ON_COMMA.join(this.columnNames));
+  }
 }
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java
index 01264e0..922ceac 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.writer.commands;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.converter.jdbc.JdbcType;
-import org.apache.gobblin.converter.jdbc.JdbcEntryData;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -33,6 +29,10 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcEntryData;
+import org.apache.gobblin.converter.jdbc.JdbcType;
+
 
 /**
  * The implementation of JdbcWriterCommands for MySQL.
@@ -47,14 +47,17 @@ public class MySqlWriterCommands implements JdbcWriterCommands {
   private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT =
       "SELECT column_name, column_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
   private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s";
+  private static final String COPY_REPLACE_STATEMENT_FORMAT = "REPLACE INTO %s.%s SELECT * FROM %s.%s";
   private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s";
 
   private final JdbcBufferedInserter jdbcBufferedWriter;
   private final Connection conn;
+  private final boolean overwriteRecords;
 
-  public MySqlWriterCommands(State state, Connection conn) {
+  public MySqlWriterCommands(State state, Connection conn, boolean overwriteRecords) {
     this.conn = conn;
-    this.jdbcBufferedWriter = new MySqlBufferedInserter(state, conn);
+    this.jdbcBufferedWriter = new MySqlBufferedInserter(state, conn, overwriteRecords);
+    this.overwriteRecords = overwriteRecords;
   }
 
   @Override
@@ -128,7 +131,7 @@ public class MySqlWriterCommands implements JdbcWriterCommands {
       pstmt.setString(2, table);
       LOG.info("Retrieving column type information from SQL: " + pstmt);
       try (ResultSet rs = pstmt.executeQuery()) {
-        if (!rs.first()) {
+        if (!rs.next()) {
           throw new IllegalArgumentException("No result from information_schema.columns");
         }
         do {
@@ -145,7 +148,10 @@ public class MySqlWriterCommands implements JdbcWriterCommands {
 
   @Override
   public void copyTable(String databaseName, String from, String to) throws SQLException {
-    String sql = String.format(COPY_INSERT_STATEMENT_FORMAT, databaseName, to, databaseName, from);
+    // Chooses between INSERT and REPLACE logic based on the job configurations
+    String sql = String
+        .format(this.overwriteRecords ? COPY_REPLACE_STATEMENT_FORMAT : COPY_INSERT_STATEMENT_FORMAT, databaseName,
+            to, databaseName, from);
     execute(sql);
   }
 
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
index baa2b44..301ef91 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
@@ -24,14 +24,14 @@ import java.sql.SQLException;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.converter.jdbc.JdbcEntryData;
-import org.apache.gobblin.converter.jdbc.JdbcType;
-
 import com.google.common.collect.ImmutableMap;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcEntryData;
+import org.apache.gobblin.converter.jdbc.JdbcType;
+
 
 /**
  * The implementation of JdbcWriterCommands for Postgres.
@@ -51,7 +51,11 @@ public class PostgresWriterCommands implements JdbcWriterCommands {
   private final JdbcBufferedInserter jdbcBufferedWriter;
   private final Connection conn;
 
-  public PostgresWriterCommands(State state, Connection conn) {
+  public PostgresWriterCommands(State state, Connection conn, boolean overwriteRecords) throws UnsupportedOperationException {
+    if (overwriteRecords) {
+      throw new IllegalArgumentException("Replace existing records is not supported in PostgresWriterCommands");
+    }
+
     this.conn = conn;
     this.jdbcBufferedWriter = new PostgresBufferedInserter(state, conn);
   }
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/TeradataWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/TeradataWriterCommands.java
index 7891193..dd3022a 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/TeradataWriterCommands.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/TeradataWriterCommands.java
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.writer.commands;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.converter.jdbc.JdbcType;
-import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import org.apache.gobblin.converter.jdbc.JdbcEntryData;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -35,6 +29,12 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcEntryData;
+import org.apache.gobblin.converter.jdbc.JdbcType;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+
 
 /**
  * The implementation of JdbcWriterCommands for Teradata.
@@ -62,7 +62,11 @@ public class TeradataWriterCommands implements JdbcWriterCommands {
   private final JdbcBufferedInserter jdbcBufferedWriter;
   private final Connection conn;
 
-  public TeradataWriterCommands(State state, Connection conn) {
+  public TeradataWriterCommands(State state, Connection conn, boolean overwriteRecords) throws UnsupportedOperationException {
+    if (overwriteRecords) {
+      throw new IllegalArgumentException("Replace existing records is not supported in TeradataWriterCommands");
+    }
+
     this.conn = conn;
     this.jdbcBufferedWriter = new TeradataBufferedInserter(state, conn);
   }
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterCommandsTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterCommandsTest.java
index b8e2c1c..91afc8e 100644
--- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterCommandsTest.java
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterCommandsTest.java
@@ -50,7 +50,7 @@ public class JdbcWriterCommandsTest {
     ResultSet rs = createMockResultSet();
     when(pstmt.executeQuery()).thenReturn(rs);
 
-    MySqlWriterCommands writerCommands = new MySqlWriterCommands(new State(), conn);
+    MySqlWriterCommands writerCommands = new MySqlWriterCommands(new State(), conn, false);
     Map<String, JdbcType> actual = writerCommands.retrieveDateColumns("db", "users");
 
     ImmutableMap.Builder<String, JdbcType> builder = ImmutableMap.builder();
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/MySqlBufferedInserterTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/MySqlBufferedInserterTest.java
index b33be6a..55a5fe1 100644
--- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/MySqlBufferedInserterTest.java
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/MySqlBufferedInserterTest.java
@@ -17,17 +17,6 @@
 
 package org.apache.gobblin.writer;
 
-import static org.apache.gobblin.writer.commands.JdbcBufferedInserter.WRITER_JDBC_INSERT_BATCH_SIZE;
-import static org.apache.gobblin.writer.commands.JdbcBufferedInserter.WRITER_JDBC_MAX_PARAM_SIZE;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -40,6 +29,14 @@ import org.apache.gobblin.converter.jdbc.JdbcEntryData;
 import org.apache.gobblin.writer.commands.JdbcBufferedInserter;
 import org.apache.gobblin.writer.commands.MySqlBufferedInserter;
 
+import static org.apache.gobblin.writer.commands.JdbcBufferedInserter.WRITER_JDBC_INSERT_BATCH_SIZE;
+import static org.apache.gobblin.writer.commands.JdbcBufferedInserter.WRITER_JDBC_MAX_PARAM_SIZE;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.*;
+
 @Test(groups = {"gobblin.writer"}, singleThreaded=true)
 public class MySqlBufferedInserterTest extends JdbcBufferedInserterTestBase {
 
@@ -52,7 +49,34 @@ public class MySqlBufferedInserterTest extends JdbcBufferedInserterTestBase {
     State state = new State();
     state.setProp(WRITER_JDBC_INSERT_BATCH_SIZE, Integer.toString(batchSize));
 
-    JdbcBufferedInserter inserter = getJdbcBufferedInserter(state, conn);
+    MySqlBufferedInserter inserter = new MySqlBufferedInserter(state, conn, false);
+
+    PreparedStatement pstmt = mock(PreparedStatement.class);
+    when(conn.prepareStatement(anyString())).thenReturn(pstmt);
+
+    List<JdbcEntryData> jdbcEntries = createJdbcEntries(colNums, colSize, entryCount);
+    for(JdbcEntryData entry : jdbcEntries) {
+      inserter.insert(db, table, entry);
+    }
+    inserter.flush();
+
+    verify(conn, times(2)).prepareStatement(matches("INSERT INTO .*"));
+    verify(pstmt, times(11)).clearParameters();
+    verify(pstmt, times(11)).execute();
+    verify(pstmt, times(colNums * entryCount)).setObject(anyInt(), anyObject());
+    reset(pstmt);
+  }
+
+  public void testMySqlBufferedReplace() throws SQLException {
+    final int colNums = 20;
+    final int batchSize = 10;
+    final int entryCount = 107;
+    final int colSize = 7;
+
+    State state = new State();
+    state.setProp(WRITER_JDBC_INSERT_BATCH_SIZE, Integer.toString(batchSize));
+
+    MySqlBufferedInserter inserter = new MySqlBufferedInserter(state, conn, true);
 
     PreparedStatement pstmt = mock(PreparedStatement.class);
     when(conn.prepareStatement(anyString())).thenReturn(pstmt);
@@ -63,7 +87,7 @@ public class MySqlBufferedInserterTest extends JdbcBufferedInserterTestBase {
     }
     inserter.flush();
 
-    verify(conn, times(2)).prepareStatement(anyString());
+    verify(conn, times(2)).prepareStatement(matches("REPLACE INTO .*"));
     verify(pstmt, times(11)).clearParameters();
     verify(pstmt, times(11)).execute();
     verify(pstmt, times(colNums * entryCount)).setObject(anyInt(), anyObject());
@@ -81,7 +105,7 @@ public class MySqlBufferedInserterTest extends JdbcBufferedInserterTestBase {
     state.setProp(WRITER_JDBC_INSERT_BATCH_SIZE, Integer.toString(batchSize));
     state.setProp(WRITER_JDBC_MAX_PARAM_SIZE, maxParamSize);
 
-    MySqlBufferedInserter inserter = new MySqlBufferedInserter(state, conn);
+    MySqlBufferedInserter inserter = new MySqlBufferedInserter(state, conn, false);
 
     PreparedStatement pstmt = mock(PreparedStatement.class);
     when(conn.prepareStatement(anyString())).thenReturn(pstmt);
@@ -94,7 +118,7 @@ public class MySqlBufferedInserterTest extends JdbcBufferedInserterTestBase {
 
     int expectedBatchSize = maxParamSize / colNums;
     int expectedExecuteCount = entryCount / expectedBatchSize + 1;
-    verify(conn, times(2)).prepareStatement(anyString());
+    verify(conn, times(2)).prepareStatement(matches("INSERT INTO .*"));
     verify(pstmt, times(expectedExecuteCount)).clearParameters();
     verify(pstmt, times(expectedExecuteCount)).execute();
     verify(pstmt, times(colNums * entryCount)).setObject(anyInt(), anyObject());
@@ -103,6 +127,6 @@ public class MySqlBufferedInserterTest extends JdbcBufferedInserterTestBase {
 
   @Override
   protected JdbcBufferedInserter getJdbcBufferedInserter(State state, Connection conn) {
-    return new MySqlBufferedInserter(state, conn);
+    return new MySqlBufferedInserter(state, conn, false);
   }
 }
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
index ac46d4d..512602b 100644
--- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
@@ -54,7 +54,7 @@ public class PostgresWriterCommandsTest {
     ResultSet rs = createMockResultSet();
     when(pstmt.executeQuery()).thenReturn(rs);
 
-    PostgresWriterCommands writerCommands = new PostgresWriterCommands(new State(), conn);
+    PostgresWriterCommands writerCommands = new PostgresWriterCommands(new State(), conn, false);
     Map<String, JdbcType> actual = writerCommands.retrieveDateColumns("db", "users");
 
     ImmutableMap.Builder<String, JdbcType> builder = ImmutableMap.builder();