You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/04 17:11:54 UTC
incubator-gobblin git commit: [GOBBLIN-630] Add a concrete
implementation for Postgres writer
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 15f201937 -> a6dfdc6d4
[GOBBLIN-630] Add a concrete implementation for Postgres writer
Closes #2512 from spotvenky/postgres_connector_2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a6dfdc6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a6dfdc6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a6dfdc6d
Branch: refs/heads/master
Commit: a6dfdc6d477fe78b2e80aedcb6554306250b8a59
Parents: 15f2019
Author: Venkatesh Iyer <vi...@linkedin.com>
Authored: Tue Dec 4 09:11:56 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Dec 4 09:11:56 2018 -0800
----------------------------------------------------------------------
.../org/apache/gobblin/writer/Destination.java | 3 +-
.../commands/JdbcWriterCommandsFactory.java | 2 +
.../commands/PostgresBufferedInserter.java | 65 +++++++
.../writer/commands/PostgresWriterCommands.java | 175 +++++++++++++++++++
.../writer/PostgresWriterCommandsTest.java | 135 ++++++++++++++
5 files changed, 379 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java b/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
index 506d4ee..6c370b7 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java
@@ -35,7 +35,8 @@ public class Destination {
HDFS,
KAFKA,
MYSQL,
- TERADATA
+ TERADATA,
+ POSTGRES
}
// Type of destination
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
index aaf832f..c650249 100644
--- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java
@@ -43,6 +43,8 @@ public class JdbcWriterCommandsFactory {
return new MySqlWriterCommands(destination.getProperties(), conn);
case TERADATA:
return new TeradataWriterCommands(destination.getProperties(), conn);
+ case POSTGRES:
+ return new PostgresWriterCommands(destination.getProperties(), conn);
default:
throw new IllegalArgumentException(destination.getType() + " is not supported");
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java
new file mode 100644
index 0000000..14f89f1
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer.commands;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcEntryData;
+import org.apache.gobblin.converter.jdbc.JdbcEntryDatum;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class PostgresBufferedInserter extends BaseJdbcBufferedInserter {
+
+ public PostgresBufferedInserter(State state, Connection conn) {
+ super(state, conn);
+ }
+
+ @Override
+ protected String createPrepareStatementStr(int batchSize) {
+ final String VALUE_FORMAT = "(%s)";
+ StringBuilder sb = new StringBuilder(this.insertStmtPrefix);
+ String values =
+ String.format(VALUE_FORMAT, JOINER_ON_COMMA.useForNull("?").join(new String[this.columnNames.size()]));
+ sb.append(values);
+ for (int i = 1; i < batchSize; i++) {
+ sb.append(',').append(values);
+ }
+
+ return sb.append(';').toString();
+ }
+
+ @Override
+ protected boolean insertBatch(PreparedStatement pstmt)
+ throws SQLException {
+ int i = 0;
+ pstmt.clearParameters();
+ for (JdbcEntryData pendingEntry : PostgresBufferedInserter.this.pendingInserts) {
+ for (JdbcEntryDatum datum : pendingEntry) {
+ pstmt.setObject(++i, datum.getVal());
+ }
+ }
+ log.debug("Executing SQL " + pstmt);
+ return pstmt.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
new file mode 100644
index 0000000..87e89c3
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer.commands;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcEntryData;
+import org.apache.gobblin.converter.jdbc.JdbcType;
+
+import com.google.common.collect.ImmutableMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * The implementation of JdbcWriterCommands for Postgres.
+ */
+@Slf4j
+public class PostgresWriterCommands implements JdbcWriterCommands {
+
+ private static final String CREATE_TABLE_SQL_FORMAT = "CREATE TABLE %s.%s (LIKE %s.%s)";
+ private static final String SELECT_SQL_FORMAT = "SELECT COUNT(*) FROM %s.%s";
+ private static final String TRUNCATE_TABLE_FORMAT = "TRUNCATE TABLE %s.%s";
+ private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE %s.%s";
+ private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT =
+ "SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
+ private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s";
+ private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s";
+
+ private final JdbcBufferedInserter jdbcBufferedWriter;
+ private final Connection conn;
+
+ public PostgresWriterCommands(State state, Connection conn) {
+ this.conn = conn;
+ this.jdbcBufferedWriter = new PostgresBufferedInserter(state, conn);
+ }
+
+ @Override
+ public void setConnectionParameters(Properties properties, Connection conn)
+ throws SQLException {
+ // Postgres writer always uses one single transaction
+ this.conn.setAutoCommit(false);
+ }
+
+ @Override
+ public void insert(String databaseName, String table, JdbcEntryData jdbcEntryData)
+ throws SQLException {
+ this.jdbcBufferedWriter.insert(databaseName, table, jdbcEntryData);
+ }
+
+ @Override
+ public void flush()
+ throws SQLException {
+ this.jdbcBufferedWriter.flush();
+ }
+
+ @Override
+ public void createTableStructure(String databaseName, String fromStructure, String targetTableName)
+ throws SQLException {
+ String sql = String.format(CREATE_TABLE_SQL_FORMAT, databaseName, targetTableName, databaseName, fromStructure);
+ execute(sql);
+ }
+
+ @Override
+ public boolean isEmpty(String database, String table)
+ throws SQLException {
+ String sql = String.format(SELECT_SQL_FORMAT, database, table);
+ try (PreparedStatement pstmt = this.conn.prepareStatement(sql); ResultSet resultSet = pstmt.executeQuery();) {
+ if (!resultSet.first()) {
+ throw new RuntimeException("Should have received at least one row from SQL " + pstmt);
+ }
+ return 0 == resultSet.getInt(1);
+ }
+ }
+
+ @Override
+ public void truncate(String database, String table)
+ throws SQLException {
+ String sql = String.format(TRUNCATE_TABLE_FORMAT, database, table);
+ execute(sql);
+ }
+
+ @Override
+ public void deleteAll(String database, String table)
+ throws SQLException {
+ String deleteSql = String.format(DELETE_STATEMENT_FORMAT, database, table);
+ execute(deleteSql);
+ }
+
+ @Override
+ public void drop(String database, String table)
+ throws SQLException {
+ log.info("Dropping table " + table);
+ String sql = String.format(DROP_TABLE_SQL_FORMAT, database, table);
+ execute(sql);
+ }
+
+ /**
+ * https://documentation.progress.com/output/DataDirect/DataDirectCloud/index.html#page/queries/postgresql-data-types.html
+ * {@inheritDoc}
+ * @see org.apache.gobblin.writer.commands.JdbcWriterCommands#retrieveDateColumns(java.sql.Connection, java.lang.String)
+ */
+ @Override
+ public Map<String, JdbcType> retrieveDateColumns(String database, String table)
+ throws SQLException {
+ Map<String, JdbcType> targetDataTypes =
+ ImmutableMap.<String, JdbcType>builder().put("DATE", JdbcType.DATE).put("TIME WITH TIME ZONE", JdbcType.TIME)
+ .put("TIME WITHOUT TIME ZONE", JdbcType.TIME).put("TIMESTAMP WITH TIME ZONE", JdbcType.TIMESTAMP)
+ .put("TIMESTAMP WITHOUT TIME ZONE", JdbcType.TIMESTAMP).build();
+
+ ImmutableMap.Builder<String, JdbcType> dateColumnsBuilder = ImmutableMap.builder();
+
+ try (PreparedStatement pstmt = this.conn
+ .prepareStatement(INFORMATION_SCHEMA_SELECT_SQL_PSTMT, ResultSet.TYPE_SCROLL_INSENSITIVE,
+ ResultSet.CONCUR_READ_ONLY)) {
+ pstmt.setString(1, database);
+ pstmt.setString(2, table);
+ log.info("Retrieving column type information from SQL: " + pstmt);
+ try (ResultSet rs = pstmt.executeQuery()) {
+ if (!rs.first()) {
+ throw new IllegalArgumentException("No result from information_schema.columns");
+ }
+ do {
+ String type = rs.getString("data_type").toUpperCase();
+ JdbcType convertedType = targetDataTypes.get(type);
+ if (convertedType != null) {
+ dateColumnsBuilder.put(rs.getString("column_name"), convertedType);
+ }
+ } while (rs.next());
+ }
+ }
+ return dateColumnsBuilder.build();
+ }
+
+ @Override
+ public void copyTable(String databaseName, String from, String to)
+ throws SQLException {
+ String sql = String.format(COPY_INSERT_STATEMENT_FORMAT, databaseName, to, databaseName, from);
+ execute(sql);
+ }
+
+ private void execute(String sql)
+ throws SQLException {
+ log.info("Executing SQL " + sql);
+ try (PreparedStatement pstmt = this.conn.prepareStatement(sql)) {
+ pstmt.execute();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("PostgresWriterCommands [bufferedWriter=%s]", this.jdbcBufferedWriter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
new file mode 100644
index 0000000..ac46d4d
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.jdbc.JdbcType;
+import org.apache.gobblin.writer.commands.PostgresWriterCommands;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.sun.rowset.JdbcRowSetImpl;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@Test(groups = {"gobblin.writer"})
+public class PostgresWriterCommandsTest {
+ @Test
+ public void testPostgresDateTypeRetrieval()
+ throws SQLException {
+ Connection conn = mock(Connection.class);
+
+ PreparedStatement pstmt = mock(PreparedStatement.class);
+ when(conn.prepareStatement(any(String.class), any(Integer.class), any(Integer.class))).thenReturn(pstmt);
+
+ ResultSet rs = createMockResultSet();
+ when(pstmt.executeQuery()).thenReturn(rs);
+
+ PostgresWriterCommands writerCommands = new PostgresWriterCommands(new State(), conn);
+ Map<String, JdbcType> actual = writerCommands.retrieveDateColumns("db", "users");
+
+ ImmutableMap.Builder<String, JdbcType> builder = ImmutableMap.builder();
+ builder.put("date_of_birth", JdbcType.DATE);
+ builder.put("last_modified", JdbcType.TIME);
+ builder.put("created", JdbcType.TIMESTAMP);
+
+ Map<String, JdbcType> expected = builder.build();
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ private ResultSet createMockResultSet() {
+ final List<Map<String, String>> expected = new ArrayList<>();
+ Map<String, String> entry = new HashMap<>();
+ entry.put("column_name", "name");
+ entry.put("data_type", "varchar");
+ expected.add(entry);
+
+ entry = new HashMap<>();
+ entry.put("column_name", "favorite_number");
+ entry.put("data_type", "varchar");
+ expected.add(entry);
+
+ entry = new HashMap<>();
+ entry.put("column_name", "favorite_color");
+ entry.put("data_type", "varchar");
+ expected.add(entry);
+
+ entry = new HashMap<>();
+ entry.put("column_name", "date_of_birth");
+ entry.put("data_type", "date");
+ expected.add(entry);
+
+ entry = new HashMap<>();
+ entry.put("column_name", "last_modified");
+ entry.put("data_type", "time without time zone");
+ expected.add(entry);
+
+ entry = new HashMap<>();
+ entry.put("column_name", "created");
+ entry.put("data_type", "timestamp with time zone");
+ expected.add(entry);
+
+ return new JdbcRowSetImpl() {
+ private Iterator<Map<String, String>> it = expected.iterator();
+ private Map<String, String> curr = null;
+
+ @Override
+ public boolean first() {
+ it = expected.iterator();
+ return next();
+ }
+
+ @Override
+ public boolean next() {
+ if (it.hasNext()) {
+ curr = it.next();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String getString(String columnLabel)
+ throws SQLException {
+ if (curr == null) {
+ throw new SQLException("NPE on current cursor.");
+ }
+ String val = curr.get(columnLabel);
+ if (val == null) {
+ throw new SQLException(columnLabel + " does not exist.");
+ }
+ return val;
+ }
+ };
+ }
+}