You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/04 17:11:54 UTC

incubator-gobblin git commit: [GOBBLIN-630] Add a concrete implementation for Postgres writer

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 15f201937 -> a6dfdc6d4


[GOBBLIN-630] Add a concrete implementation for Postgres writer

Closes #2512 from spotvenky/postgres_connector_2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a6dfdc6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a6dfdc6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a6dfdc6d

Branch: refs/heads/master
Commit: a6dfdc6d477fe78b2e80aedcb6554306250b8a59
Parents: 15f2019
Author: Venkatesh Iyer <vi...@linkedin.com>
Authored: Tue Dec 4 09:11:56 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Dec 4 09:11:56 2018 -0800

----------------------------------------------------------------------
 .../org/apache/gobblin/writer/Destination.java  |   3 +-
 .../commands/JdbcWriterCommandsFactory.java     |   2 +
 .../commands/PostgresBufferedInserter.java      |  65 +++++++
 .../writer/commands/PostgresWriterCommands.java | 175 +++++++++++++++++++
 .../writer/PostgresWriterCommandsTest.java      | 135 ++++++++++++++
 5 files changed, 379 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java b/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
index 506d4ee..6c370b7 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
@@ -35,7 +35,8 @@ public class Destination {
     HDFS,
     KAFKA,
     MYSQL,
-    TERADATA
+    TERADATA,
+    POSTGRES
   }
 
   // Type of destination

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
index aaf832f..c650249 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
@@ -43,6 +43,8 @@ public class JdbcWriterCommandsFactory {
         return new MySqlWriterCommands(destination.getProperties(), conn);
       case TERADATA:
         return new TeradataWriterCommands(destination.getProperties(), conn);
+      case POSTGRES:
+        return new PostgresWriterCommands(destination.getProperties(), conn);
       default:
         throw new IllegalArgumentException(destination.getType() + " is not supported");
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java
new file mode 100644
index 0000000..14f89f1
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer.commands;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcEntryData;
+import org.apache.gobblin.converter.jdbc.JdbcEntryDatum;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class PostgresBufferedInserter extends BaseJdbcBufferedInserter {
+
+  public PostgresBufferedInserter(State state, Connection conn) {
+    super(state, conn);
+  }
+
+  @Override
+  protected String createPrepareStatementStr(int batchSize) {
+    final String VALUE_FORMAT = "(%s)";
+    StringBuilder sb = new StringBuilder(this.insertStmtPrefix);
+    String values =
+        String.format(VALUE_FORMAT, JOINER_ON_COMMA.useForNull("?").join(new String[this.columnNames.size()]));
+    sb.append(values);
+    for (int i = 1; i < batchSize; i++) {
+      sb.append(',').append(values);
+    }
+
+    return sb.append(';').toString();
+  }
+
+  @Override
+  protected boolean insertBatch(PreparedStatement pstmt)
+      throws SQLException {
+    int i = 0;
+    pstmt.clearParameters();
+    for (JdbcEntryData pendingEntry : PostgresBufferedInserter.this.pendingInserts) {
+      for (JdbcEntryDatum datum : pendingEntry) {
+        pstmt.setObject(++i, datum.getVal());
+      }
+    }
+    log.debug("Executing SQL " + pstmt);
+    return pstmt.execute();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
new file mode 100644
index 0000000..87e89c3
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer.commands;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcEntryData;
+import org.apache.gobblin.converter.jdbc.JdbcType;
+
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * The implementation of JdbcWriterCommands for Postgres.
+ */
+@Slf4j
+public class PostgresWriterCommands implements JdbcWriterCommands {
+
+  private static final String CREATE_TABLE_SQL_FORMAT = "CREATE TABLE %s.%s (LIKE %s.%s)";
+  private static final String SELECT_SQL_FORMAT = "SELECT COUNT(*) FROM %s.%s";
+  private static final String TRUNCATE_TABLE_FORMAT = "TRUNCATE TABLE %s.%s";
+  private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE %s.%s";
+  private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT =
+      "SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
+  private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s";
+  private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s";
+
+  private final JdbcBufferedInserter jdbcBufferedWriter;
+  private final Connection conn;
+
+  public PostgresWriterCommands(State state, Connection conn) {
+    this.conn = conn;
+    this.jdbcBufferedWriter = new PostgresBufferedInserter(state, conn);
+  }
+
+  @Override
+  public void setConnectionParameters(Properties properties, Connection conn)
+      throws SQLException {
+    // Postgres writer always uses one single transaction
+    this.conn.setAutoCommit(false);
+  }
+
+  @Override
+  public void insert(String databaseName, String table, JdbcEntryData jdbcEntryData)
+      throws SQLException {
+    this.jdbcBufferedWriter.insert(databaseName, table, jdbcEntryData);
+  }
+
+  @Override
+  public void flush()
+      throws SQLException {
+    this.jdbcBufferedWriter.flush();
+  }
+
+  @Override
+  public void createTableStructure(String databaseName, String fromStructure, String targetTableName)
+      throws SQLException {
+    String sql = String.format(CREATE_TABLE_SQL_FORMAT, databaseName, targetTableName, databaseName, fromStructure);
+    execute(sql);
+  }
+
+  @Override
+  public boolean isEmpty(String database, String table)
+      throws SQLException {
+    String sql = String.format(SELECT_SQL_FORMAT, database, table);
+    try (PreparedStatement pstmt = this.conn.prepareStatement(sql); ResultSet resultSet = pstmt.executeQuery();) {
+      if (!resultSet.first()) {
+        throw new RuntimeException("Should have received at least one row from SQL " + pstmt);
+      }
+      return 0 == resultSet.getInt(1);
+    }
+  }
+
+  @Override
+  public void truncate(String database, String table)
+      throws SQLException {
+    String sql = String.format(TRUNCATE_TABLE_FORMAT, database, table);
+    execute(sql);
+  }
+
+  @Override
+  public void deleteAll(String database, String table)
+      throws SQLException {
+    String deleteSql = String.format(DELETE_STATEMENT_FORMAT, database, table);
+    execute(deleteSql);
+  }
+
+  @Override
+  public void drop(String database, String table)
+      throws SQLException {
+    log.info("Dropping table " + table);
+    String sql = String.format(DROP_TABLE_SQL_FORMAT, database, table);
+    execute(sql);
+  }
+
+  /**
+   * https://documentation.progress.com/output/DataDirect/DataDirectCloud/index.html#page/queries/postgresql-data-types.html
+   * {@inheritDoc}
+   * @see org.apache.gobblin.writer.commands.JdbcWriterCommands#retrieveDateColumns(java.sql.Connection, java.lang.String)
+   */
+  @Override
+  public Map<String, JdbcType> retrieveDateColumns(String database, String table)
+      throws SQLException {
+    Map<String, JdbcType> targetDataTypes =
+        ImmutableMap.<String, JdbcType>builder().put("DATE", JdbcType.DATE).put("TIME WITH TIME ZONE", JdbcType.TIME)
+            .put("TIME WITHOUT TIME ZONE", JdbcType.TIME).put("TIMESTAMP WITH TIME ZONE", JdbcType.TIMESTAMP)
+            .put("TIMESTAMP WITHOUT TIME ZONE", JdbcType.TIMESTAMP).build();
+
+    ImmutableMap.Builder<String, JdbcType> dateColumnsBuilder = ImmutableMap.builder();
+
+    try (PreparedStatement pstmt = this.conn
+        .prepareStatement(INFORMATION_SCHEMA_SELECT_SQL_PSTMT, ResultSet.TYPE_SCROLL_INSENSITIVE,
+            ResultSet.CONCUR_READ_ONLY)) {
+      pstmt.setString(1, database);
+      pstmt.setString(2, table);
+      log.info("Retrieving column type information from SQL: " + pstmt);
+      try (ResultSet rs = pstmt.executeQuery()) {
+        if (!rs.first()) {
+          throw new IllegalArgumentException("No result from information_schema.columns");
+        }
+        do {
+          String type = rs.getString("data_type").toUpperCase();
+          JdbcType convertedType = targetDataTypes.get(type);
+          if (convertedType != null) {
+            dateColumnsBuilder.put(rs.getString("column_name"), convertedType);
+          }
+        } while (rs.next());
+      }
+    }
+    return dateColumnsBuilder.build();
+  }
+
+  @Override
+  public void copyTable(String databaseName, String from, String to)
+      throws SQLException {
+    String sql = String.format(COPY_INSERT_STATEMENT_FORMAT, databaseName, to, databaseName, from);
+    execute(sql);
+  }
+
+  private void execute(String sql)
+      throws SQLException {
+    log.info("Executing SQL " + sql);
+    try (PreparedStatement pstmt = this.conn.prepareStatement(sql)) {
+      pstmt.execute();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("PostgresWriterCommands [bufferedWriter=%s]", this.jdbcBufferedWriter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
new file mode 100644
index 0000000..ac46d4d
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcType;
+import org.apache.gobblin.writer.commands.PostgresWriterCommands;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.sun.rowset.JdbcRowSetImpl;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@Test(groups = {"gobblin.writer"})
+public class PostgresWriterCommandsTest {
+  @Test
+  public void testPostgresDateTypeRetrieval()
+      throws SQLException {
+    Connection conn = mock(Connection.class);
+
+    PreparedStatement pstmt = mock(PreparedStatement.class);
+    when(conn.prepareStatement(any(String.class), any(Integer.class), any(Integer.class))).thenReturn(pstmt);
+
+    ResultSet rs = createMockResultSet();
+    when(pstmt.executeQuery()).thenReturn(rs);
+
+    PostgresWriterCommands writerCommands = new PostgresWriterCommands(new State(), conn);
+    Map<String, JdbcType> actual = writerCommands.retrieveDateColumns("db", "users");
+
+    ImmutableMap.Builder<String, JdbcType> builder = ImmutableMap.builder();
+    builder.put("date_of_birth", JdbcType.DATE);
+    builder.put("last_modified", JdbcType.TIME);
+    builder.put("created", JdbcType.TIMESTAMP);
+
+    Map<String, JdbcType> expected = builder.build();
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  private ResultSet createMockResultSet() {
+    final List<Map<String, String>> expected = new ArrayList<>();
+    Map<String, String> entry = new HashMap<>();
+    entry.put("column_name", "name");
+    entry.put("data_type", "varchar");
+    expected.add(entry);
+
+    entry = new HashMap<>();
+    entry.put("column_name", "favorite_number");
+    entry.put("data_type", "varchar");
+    expected.add(entry);
+
+    entry = new HashMap<>();
+    entry.put("column_name", "favorite_color");
+    entry.put("data_type", "varchar");
+    expected.add(entry);
+
+    entry = new HashMap<>();
+    entry.put("column_name", "date_of_birth");
+    entry.put("data_type", "date");
+    expected.add(entry);
+
+    entry = new HashMap<>();
+    entry.put("column_name", "last_modified");
+    entry.put("data_type", "time without time zone");
+    expected.add(entry);
+
+    entry = new HashMap<>();
+    entry.put("column_name", "created");
+    entry.put("data_type", "timestamp with time zone");
+    expected.add(entry);
+
+    return new JdbcRowSetImpl() {
+      private Iterator<Map<String, String>> it = expected.iterator();
+      private Map<String, String> curr = null;
+
+      @Override
+      public boolean first() {
+        it = expected.iterator();
+        return next();
+      }
+
+      @Override
+      public boolean next() {
+        if (it.hasNext()) {
+          curr = it.next();
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public String getString(String columnLabel)
+          throws SQLException {
+        if (curr == null) {
+          throw new SQLException("NPE on current cursor.");
+        }
+        String val = curr.get(columnLabel);
+        if (val == null) {
+          throw new SQLException(columnLabel + " does not exist.");
+        }
+        return val;
+      }
+    };
+  }
+}