You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hawq.apache.org by leskin-in <gi...@git.apache.org> on 2018/04/09 08:06:32 UTC

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

GitHub user leskin-in opened a pull request:

    https://github.com/apache/incubator-hawq/pull/1353

    HAWQ-1605. Support INSERT in PXF JDBC plugin

    Add support of INSERT queries in PXF JDBC plugin:
    * Implement interfaces `WriteAccessor` (by `JdbcAccessor` class) and `WriteResolver` (by `JdbcResolver` class);
    * Support query batching in `JdbcAccessor` when processing INSERT query. The size of a batch is defined by user and may be "infinite";
    * In `JdbcAccessor`, use `java.sql.PreparedStatement` and JDBC standard functions to process queries (and to support batching);
    * In `setFields()` method of `JdbcResolver`, perform type conversions of the data tuples received from PXF;
    * Support both transactional and non-transactional databases when performing INSERT queries.
    
    Refactor the code in PXF JDBC plugin, make some microoptimizations and fixes:
    * Fix the handling of TIMESTAMP when performing SELECT requests;
    * Make functions for building WHERE statements static where possible to reduce the number of `InputData` checks. This is proposed by @hornn;
    * Prettify some SQL statements generated by the PXF JDBC plugin and change the tests respectively;
    * Organize imports;
    * Expand some intricate `if ... else` constructions.
    
    Improve documentation:
    * Clarify, expand or rewrite Javadoc strings;
    * Rewrite README.md.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arenadata/incubator-hawq pxf_jdbc_writeAndFix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-hawq/pull/1353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1353
    
----
commit ba47942574049afc72c93033e03c7a7f1cf25f13
Author: Ivan Leskin <le...@...>
Date:   2018-03-05T14:19:26Z

    Fix incorrect TIMESTAMP handling

commit 3eb0daa8b5aca8c6c85cd7e6ebec8ea4b57c2966
Author: Ivan Leskin <le...@...>
Date:   2018-03-07T17:24:01Z

    PXF JDBC plugin update
    
    * Add support for INSERT queries:
    	* The INSERT queries are processed by the same classes as the SELECT queries;
    	* INSERTs are processed by the JDBC PreparedStatement;
    	* INSERTs support batching (by means of JDBC);
    
    * Minor changes in WhereSQLBuilder and JdbcPartitionFragmenter:
    	* Removed 'WHERE 1=1';
    	* The same pattern of spaces around operators everywhere ('a = b', not 'a=b');
    	* JdbcPartitionFragmenter.buildFragmenterSql() made static to avoid extra checks of InputData (proposed by @sansanichfb);
    
    * Refactoring and some microoptimizations;

commit 98f18c54cd3a9a63c8e9b5a13fb48f4494994051
Author: Ivan Leskin <le...@...>
Date:   2018-04-02T17:57:56Z

    PXF JDBC refactoring
    
    * The README.md is completely rewritten;
    
    * Lots of changes in comments and javadoc comments;
    
    * Code refactoring and minor changes in codestyle

----


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214408552
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java ---
    @@ -0,0 +1,113 @@
    +package org.apache.hawq.pxf.plugins.jdbc.writercallable;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +
    +/**
    + * This writer makes batch INSERTs.
    + *
    + * A call() is required after a certain number of supply() calls
    + */
    +class BatchWriterCallable implements WriterCallable {
    +    @Override
    +    public void supply(OneRow row) throws IllegalStateException {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
    +        }
    +        if (row == null) {
    +            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
    +        }
    +        rows.add(row);
    +    }
    +
    +    @Override
    +    public boolean isCallRequired() {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    @Override
    +    public SQLException call() throws IOException, SQLException, ClassNotFoundException {
    +        if (rows.isEmpty()) {
    +            return null;
    +        }
    +
    +        boolean statementMustBeDeleted = false;
    +        if (statement == null) {
    +            statement = plugin.getPreparedStatement(plugin.getConnection(), query);
    +            statementMustBeDeleted = true;
    +        }
    +
    +        for (OneRow row : rows) {
    +            JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
    +            statement.addBatch();
    +        }
    +
    +        try {
    +            statement.executeBatch();
    +        }
    +        catch (SQLException e) {
    +            return e;
    +        }
    +        finally {
    +            rows.clear();
    +            if (statementMustBeDeleted) {
    +                JdbcPlugin.closeStatement(statement);
    +                statement = null;
    +            }
    +        }
    +
    +        return null;
    +    }
    +
    +    /**
    +     * Construct a new batch writer
    +     */
    +    BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement, int maxRowsCount) throws IllegalArgumentException {
    +        if ((plugin == null) || (query == null)) {
    +            throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null");
    +        }
    +        this.plugin = plugin;
    +        this.query = query;
    +        this.statement = statement;
    +        if (maxRowsCount < 0) {
    +            maxRowsCount = 0;
    --- End diff --
    
    Done. The default number is set by `WriterCallableFactory`: https://github.com/apache/incubator-hawq/pull/1353/commits/52e9f7f2050b6e200e8e9cdd1dba2a7f22c31923#diff-b21204dffcdf144d4d2c5203f2b9a8b5R84


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    Seems almost all of comments have been taken care of. Let's come to conclusion on BATCH_SIZE semantics and implementation, that's the last outstanding question as of now.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r182614960
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +        if (dbMeta.supportsTransactions()) {
    +            dbConn.setAutoCommit(false);
    +        }
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = dbConn.prepareStatement(queryWrite);
    +
    +        if ((batchSize != 0) && (!dbMeta.supportsBatchUpdates())) {
    +            LOG.info(
    +                "The database '" +
    +                dbMeta.getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     */
    +    @Override
    +    @SuppressWarnings("unchecked")
    +    public boolean writeNextObject(OneRow row) throws SQLException, IOException {
    +        // This cast is safe because the data in the row is formed by JdbcPlugin
    +        List<OneField> tuple = (List<OneField>) row.getData();
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("writeNextObject() called");
    +        }
    +
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Field " + i + ": " + DataType.get(field.type).toString());
    +            }
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statementWrite.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statementWrite.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statementWrite.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statementWrite.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statementWrite.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statementWrite.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statementWrite.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statementWrite.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from JdbcResolver is corrupted");
    --- End diff --
    
    This message is not very actionable. I'd be more specific and mention that certain data type is not supported. 


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r182614235
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +        if (dbMeta.supportsTransactions()) {
    +            dbConn.setAutoCommit(false);
    +        }
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = dbConn.prepareStatement(queryWrite);
    +
    +        if ((batchSize != 0) && (!dbMeta.supportsBatchUpdates())) {
    +            LOG.info(
    --- End diff --
    
    IMO warning is more appropriate here since the user had an expectation and explicitly set batch set to non-zero.


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by kongyew <gi...@git.apache.org>.
Github user kongyew commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
     **This JDBC profile is not officially supported** by Greenplum since it is not tested at all.  Use it at your own risks.  The interface maybe changed in the future since the current profile is not securely storing credentials.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214412634
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java ---
    @@ -0,0 +1,113 @@
    +package org.apache.hawq.pxf.plugins.jdbc.writercallable;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +
    +/**
    + * This writer makes batch INSERTs.
    + *
    + * A call() is required after a certain number of supply() calls
    + */
    +class BatchWriterCallable implements WriterCallable {
    +    @Override
    +    public void supply(OneRow row) throws IllegalStateException {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
    +        }
    +        if (row == null) {
    +            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
    +        }
    +        rows.add(row);
    +    }
    +
    +    @Override
    +    public boolean isCallRequired() {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    @Override
    +    public SQLException call() throws IOException, SQLException, ClassNotFoundException {
    +        if (rows.isEmpty()) {
    +            return null;
    +        }
    +
    +        boolean statementMustBeDeleted = false;
    +        if (statement == null) {
    +            statement = plugin.getPreparedStatement(plugin.getConnection(), query);
    +            statementMustBeDeleted = true;
    +        }
    +
    +        for (OneRow row : rows) {
    +            JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
    +            statement.addBatch();
    +        }
    +
    +        try {
    +            statement.executeBatch();
    +        }
    +        catch (SQLException e) {
    +            return e;
    +        }
    +        finally {
    +            rows.clear();
    +            if (statementMustBeDeleted) {
    +                JdbcPlugin.closeStatement(statement);
    +                statement = null;
    +            }
    +        }
    +
    +        return null;
    +    }
    +
    +    /**
    +     * Construct a new batch writer
    +     */
    +    BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement, int maxRowsCount) throws IllegalArgumentException {
    +        if ((plugin == null) || (query == null)) {
    +            throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null");
    +        }
    +        this.plugin = plugin;
    +        this.query = query;
    +        this.statement = statement;
    +        if (maxRowsCount < 0) {
    +            maxRowsCount = 0;
    --- End diff --
    
    thanks you for the changes. Should we still remove the case of infinite batching and error out if user sets BATCH_SIZE < 0 ? This is dangerous logic, and if a given use case supports huge batches, let users define tables with appropriate batch sizes.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by kapustor <gi...@git.apache.org>.
Github user kapustor commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214508994
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java ---
    @@ -75,16 +75,15 @@ public void setQuery(String query) {
         /**
          * Set batch size to use.
          *
    -     * @param batchSize < 0: Use batches of infinite size
    +     * @param batchSize = 0: Use batches of recommended size
          * @param batchSize = 1: Do not use batches
          * @param batchSize > 1: Use batches of the given size
    +     * @param batchSize < 0: Use batches of infinite size
          */
         public void setBatchSize(int batchSize) {
    -        if (batchSize < 0) {
    -            batchSize = 0;
    -        }
    -        else if (batchSize == 0) {
    -            batchSize = 1;
    +        if (batchSize == 0) {
    +            // Set the recommended value: https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
    +            batchSize = 100;
    --- End diff --
    
    As I understand, now optimal parameter values look like this:
    ```
    1. nothing (not provided) -- use default of 100
    2. 0 -- no batching
    3. 1 -- no batching
    4. >1 -- use the number provided by the user
    ```
    In my opinion, we shouldn't limit possible `BATCH_SIZE` max value (or we should set it very high, to 500k for example) as we dont know the usecases and external DBs that users will use it with. It is possible that there are some DBs or other systems that use high batch size for optimal inserts.
    @denalex what do you think about `BATCH_SIZE` max value?


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r193735925
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    --- End diff --
    
    Fixed: https://github.com/apache/incubator-hawq/pull/1353/commits/7fe4cac858a64ed5260d0e86342980f355fe7eb1#diff-6829beb5b62de0a4880b5d5dbebca91aR70


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213476734
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java ---
    @@ -0,0 +1,364 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadResolver;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.WriteResolver;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +import java.text.ParseException;
    +import java.text.SimpleDateFormat;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables resolver
    + */
    +public class JdbcResolver extends JdbcPlugin implements ReadResolver, WriteResolver {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcResolver(InputData input) throws UserDataException {
    +        super(input);
    +    }
    +
    +    /**
    +     * getFields() implementation
    +     *
    +     * @throws SQLException if the provided {@link OneRow} object is invalid
    +     */
    +    @Override
    +    public List<OneField> getFields(OneRow row) throws SQLException {
    +        ResultSet result = (ResultSet) row.getData();
    +        LinkedList<OneField> fields = new LinkedList<>();
    +
    +        for (ColumnDescriptor column : columns) {
    +            String colName = column.columnName();
    +            Object value = null;
    +
    +            OneField oneField = new OneField();
    +            oneField.type = column.columnTypeCode();
    +
    +            switch (DataType.get(oneField.type)) {
    +                case INTEGER:
    +                    value = result.getInt(colName);
    +                    break;
    +                case FLOAT8:
    +                    value = result.getDouble(colName);
    +                    break;
    +                case REAL:
    +                    value = result.getFloat(colName);
    +                    break;
    +                case BIGINT:
    +                    value = result.getLong(colName);
    +                    break;
    +                case SMALLINT:
    +                    value = result.getShort(colName);
    +                    break;
    +                case BOOLEAN:
    +                    value = result.getBoolean(colName);
    +                    break;
    +                case BYTEA:
    +                    value = result.getBytes(colName);
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case NUMERIC:
    +                    value = result.getString(colName);
    +                    break;
    +                case DATE:
    +                    value = result.getDate(colName);
    +                    break;
    +                case TIMESTAMP:
    +                    value = result.getTimestamp(colName);
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +            }
    +
    +            oneField.val = value;
    +            fields.add(oneField);
    +        }
    +        return fields;
    +    }
    +
    +    /**
    +     * setFields() implementation
    +     *
    +     * @return OneRow with the data field containing a List<OneField>
    +     * OneFields are not reordered before being passed to Accessor; at the
    +     * moment, there is no way to correct the order of the fields if it is not.
    +     * In practice, the 'record' provided is always ordered the right way.
    +     *
    +     * @throws UnsupportedOperationException if field of some type is not supported
    +     */
    +    @Override
    +    public OneRow setFields(List<OneField> record) throws UnsupportedOperationException, ParseException {
    +        int column_index = 0;
    +        for (OneField oneField : record) {
    +            ColumnDescriptor column = columns.get(column_index);
    +            if (
    +                LOG.isDebugEnabled() &&
    +                DataType.get(column.columnTypeCode()) != DataType.get(oneField.type)
    +            ) {
    +                LOG.warn("The provided tuple of data may be disordered. Datatype of column with descriptor '" + column.toString() + "' must be '" + DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + DataType.get(oneField.type).toString() + "'");
    +            }
    +
    +            // Check that data type is supported
    +            switch (DataType.get(oneField.type)) {
    +                case BOOLEAN:
    +                case INTEGER:
    +                case FLOAT8:
    +                case REAL:
    +                case BIGINT:
    +                case SMALLINT:
    +                case NUMERIC:
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case BYTEA:
    +                case TIMESTAMP:
    +                case DATE:
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +            }
    +
    +            if (LOG.isDebugEnabled()) {
    +                if (DataType.get(oneField.type) == DataType.BYTEA) {
    +                    LOG.debug("OneField content (conversion from BYTEA): '" + new String((byte[])oneField.val) + "'");
    +                }
    +            }
    +
    +            // Convert TEXT columns into native data types
    +            if ((oneField.val != null) && (DataType.get(oneField.type) == DataType.TEXT) && (DataType.get(column.columnTypeCode()) != DataType.TEXT)) {
    +                String rawVal = (String)oneField.val;
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("OneField content (conversion from TEXT): '" + rawVal + "'");
    +                }
    +                switch (DataType.get(column.columnTypeCode())) {
    +                    case VARCHAR:
    +                    case BPCHAR:
    +                    case TEXT:
    +                    case BYTEA:
    +                        break;
    +                    case BOOLEAN:
    +                        oneField.val = (Object)Boolean.parseBoolean(rawVal);
    +                        break;
    +                    case INTEGER:
    +                        oneField.val = (Object)Integer.parseInt(rawVal);
    +                        break;
    +                    case FLOAT8:
    +                        oneField.val = (Object)Double.parseDouble(rawVal);
    +                        break;
    +                    case REAL:
    +                        oneField.val = (Object)Float.parseFloat(rawVal);
    +                        break;
    +                    case BIGINT:
    +                        oneField.val = (Object)Long.parseLong(rawVal);
    +                        break;
    +                    case SMALLINT:
    +                        oneField.val = (Object)Short.parseShort(rawVal);
    +                        break;
    +                    case NUMERIC:
    +                        oneField.val = (Object)new BigDecimal(rawVal);
    +                        break;
    +                    case TIMESTAMP:
    +                        boolean isConversionSuccessful = false;
    +                        for (SimpleDateFormat sdf : timestampSDFs.get()) {
    +                            try {
    +                                java.util.Date parsedTimestamp = sdf.parse(rawVal);
    +                                oneField.val = (Object)new Timestamp(parsedTimestamp.getTime());
    +                                isConversionSuccessful = true;
    +                                break;
    +                            }
    +                            catch (ParseException e) {
    +                                // pass
    +                            }
    +                        }
    +                        if (!isConversionSuccessful) {
    +                            throw new ParseException(rawVal, 0);
    +                        }
    +                        break;
    +                    case DATE:
    +                        oneField.val = (Object)new Date(dateSDF.get().parse(rawVal).getTime());
    +                        break;
    +                    default:
    +                        throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +                }
    +                oneField.type = column.columnTypeCode();
    +            }
    +
    +            column_index += 1;
    +        }
    +        return new OneRow(new LinkedList<OneField>(record));
    +    }
    +
    +    /**
    +     * Decode OneRow object and pass all its contents to a PreparedStatement
    +     *
    +     * @throws IOException if data in a OneRow is corrupted
    +     * @throws SQLException if the given statement is broken
    +     */
    +    @SuppressWarnings("unchecked")
    +    public static void decodeOneRowToPreparedStatement(OneRow row, PreparedStatement statement) throws IOException, SQLException {
    +        // This is safe: OneRow comes from JdbcResolver
    +        List<OneField> tuple = (List<OneField>)row.getData();
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statement.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statement.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statement.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statement.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statement.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statement.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statement.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statement.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from JdbcResolver is corrupted");
    +            }
    +        }
    +    }
    +
    +    private static final Log LOG = LogFactory.getLog(JdbcResolver.class);
    +
    +    // SimpleDateFormat to parse TEXT into DATE
    +    private static ThreadLocal<SimpleDateFormat> dateSDF = new ThreadLocal<SimpleDateFormat>() {
    --- End diff --
    
    how are these threadlocals are getting cleaned ? if this logic executes on Tomcat thread pool, those threads are created and discarded as load changes, so when a thread is discarded, the thread local needs to be cleaned, which is the usual problem with threadlocals. Maybe use some thread-safe library for formatting, like joda-time ?


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by deem0n <gi...@git.apache.org>.
Github user deem0n commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213577440
  
    --- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
    @@ -177,11 +177,11 @@ under the License.
         </profile>
         <profile>
             <name>Jdbc</name>
    -        <description>A profile for reading data into HAWQ via JDBC</description>
    +        <description>A profile to access (read & write) data via JDBC</description>
    --- End diff --
    
    or may be change `&` to `&amp;` ? 


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214444738
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java ---
    @@ -75,16 +75,15 @@ public void setQuery(String query) {
         /**
          * Set batch size to use.
          *
    -     * @param batchSize < 0: Use batches of infinite size
    +     * @param batchSize = 0: Use batches of recommended size
          * @param batchSize = 1: Do not use batches
          * @param batchSize > 1: Use batches of the given size
    +     * @param batchSize < 0: Use batches of infinite size
          */
         public void setBatchSize(int batchSize) {
    -        if (batchSize < 0) {
    -            batchSize = 0;
    -        }
    -        else if (batchSize == 0) {
    -            batchSize = 1;
    +        if (batchSize == 0) {
    +            // Set the recommended value: https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
    +            batchSize = 100;
    --- End diff --
    
    oh, thanks, got it, I thought batch on/off was a separate property. If we want to stick with a single property (and keep it simple for users), then I think using batching by default (with a default value) is a good idea (if the backend db supports it). This way, though, we will have to let users to turn it off for a query, which they will achieve by setting the value to 0 (batch feature is off) or 1 (batch with size 1 practically means no batching, implementation can take care of that). I think this is closer to your original design, except I would take out negative values to prohibit unlimited size, as well as maybe put an upper bound on batch size to prevent crashing PXF with OOM.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214430750
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java ---
    @@ -0,0 +1,364 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadResolver;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.WriteResolver;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +import java.text.ParseException;
    +import java.text.SimpleDateFormat;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables resolver
    + */
    +public class JdbcResolver extends JdbcPlugin implements ReadResolver, WriteResolver {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcResolver(InputData input) throws UserDataException {
    +        super(input);
    +    }
    +
    +    /**
    +     * getFields() implementation
    +     *
    +     * @throws SQLException if the provided {@link OneRow} object is invalid
    +     */
    +    @Override
    +    public List<OneField> getFields(OneRow row) throws SQLException {
    +        ResultSet result = (ResultSet) row.getData();
    +        LinkedList<OneField> fields = new LinkedList<>();
    +
    +        for (ColumnDescriptor column : columns) {
    +            String colName = column.columnName();
    +            Object value = null;
    +
    +            OneField oneField = new OneField();
    +            oneField.type = column.columnTypeCode();
    +
    +            switch (DataType.get(oneField.type)) {
    +                case INTEGER:
    +                    value = result.getInt(colName);
    +                    break;
    +                case FLOAT8:
    +                    value = result.getDouble(colName);
    +                    break;
    +                case REAL:
    +                    value = result.getFloat(colName);
    +                    break;
    +                case BIGINT:
    +                    value = result.getLong(colName);
    +                    break;
    +                case SMALLINT:
    +                    value = result.getShort(colName);
    +                    break;
    +                case BOOLEAN:
    +                    value = result.getBoolean(colName);
    +                    break;
    +                case BYTEA:
    +                    value = result.getBytes(colName);
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case NUMERIC:
    +                    value = result.getString(colName);
    +                    break;
    +                case DATE:
    +                    value = result.getDate(colName);
    +                    break;
    +                case TIMESTAMP:
    +                    value = result.getTimestamp(colName);
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +            }
    +
    +            oneField.val = value;
    +            fields.add(oneField);
    +        }
    +        return fields;
    +    }
    +
    +    /**
    +     * setFields() implementation
    +     *
    +     * @return OneRow with the data field containing a List<OneField>
    +     * OneFields are not reordered before being passed to Accessor; at the
    +     * moment, there is no way to correct the order of the fields if it is not.
    +     * In practice, the 'record' provided is always ordered the right way.
    +     *
    +     * @throws UnsupportedOperationException if field of some type is not supported
    +     */
    +    @Override
    +    public OneRow setFields(List<OneField> record) throws UnsupportedOperationException, ParseException {
    +        int column_index = 0;
    +        for (OneField oneField : record) {
    +            ColumnDescriptor column = columns.get(column_index);
    +            if (
    +                LOG.isDebugEnabled() &&
    +                DataType.get(column.columnTypeCode()) != DataType.get(oneField.type)
    +            ) {
    +                LOG.warn("The provided tuple of data may be disordered. Datatype of column with descriptor '" + column.toString() + "' must be '" + DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + DataType.get(oneField.type).toString() + "'");
    +            }
    +
    +            // Check that data type is supported
    +            switch (DataType.get(oneField.type)) {
    +                case BOOLEAN:
    +                case INTEGER:
    +                case FLOAT8:
    +                case REAL:
    +                case BIGINT:
    +                case SMALLINT:
    +                case NUMERIC:
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case BYTEA:
    +                case TIMESTAMP:
    +                case DATE:
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +            }
    +
    +            if (LOG.isDebugEnabled()) {
    +                if (DataType.get(oneField.type) == DataType.BYTEA) {
    +                    LOG.debug("OneField content (conversion from BYTEA): '" + new String((byte[])oneField.val) + "'");
    +                }
    +            }
    +
    +            // Convert TEXT columns into native data types
    +            if ((oneField.val != null) && (DataType.get(oneField.type) == DataType.TEXT) && (DataType.get(column.columnTypeCode()) != DataType.TEXT)) {
    +                String rawVal = (String)oneField.val;
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("OneField content (conversion from TEXT): '" + rawVal + "'");
    +                }
    +                switch (DataType.get(column.columnTypeCode())) {
    +                    case VARCHAR:
    +                    case BPCHAR:
    +                    case TEXT:
    +                    case BYTEA:
    +                        break;
    +                    case BOOLEAN:
    +                        oneField.val = (Object)Boolean.parseBoolean(rawVal);
    +                        break;
    +                    case INTEGER:
    +                        oneField.val = (Object)Integer.parseInt(rawVal);
    +                        break;
    +                    case FLOAT8:
    +                        oneField.val = (Object)Double.parseDouble(rawVal);
    +                        break;
    +                    case REAL:
    +                        oneField.val = (Object)Float.parseFloat(rawVal);
    +                        break;
    +                    case BIGINT:
    +                        oneField.val = (Object)Long.parseLong(rawVal);
    +                        break;
    +                    case SMALLINT:
    +                        oneField.val = (Object)Short.parseShort(rawVal);
    +                        break;
    +                    case NUMERIC:
    +                        oneField.val = (Object)new BigDecimal(rawVal);
    +                        break;
    +                    case TIMESTAMP:
    +                        boolean isConversionSuccessful = false;
    +                        for (SimpleDateFormat sdf : timestampSDFs.get()) {
    +                            try {
    +                                java.util.Date parsedTimestamp = sdf.parse(rawVal);
    +                                oneField.val = (Object)new Timestamp(parsedTimestamp.getTime());
    +                                isConversionSuccessful = true;
    +                                break;
    +                            }
    +                            catch (ParseException e) {
    +                                // pass
    +                            }
    +                        }
    +                        if (!isConversionSuccessful) {
    +                            throw new ParseException(rawVal, 0);
    +                        }
    +                        break;
    +                    case DATE:
    +                        oneField.val = (Object)new Date(dateSDF.get().parse(rawVal).getTime());
    +                        break;
    +                    default:
    +                        throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +                }
    +                oneField.type = column.columnTypeCode();
    +            }
    +
    +            column_index += 1;
    +        }
    +        return new OneRow(new LinkedList<OneField>(record));
    +    }
    +
    +    /**
    +     * Decode OneRow object and pass all its contents to a PreparedStatement
    +     *
    +     * @throws IOException if data in a OneRow is corrupted
    +     * @throws SQLException if the given statement is broken
    +     */
    +    @SuppressWarnings("unchecked")
    +    public static void decodeOneRowToPreparedStatement(OneRow row, PreparedStatement statement) throws IOException, SQLException {
    +        // This is safe: OneRow comes from JdbcResolver
    +        List<OneField> tuple = (List<OneField>)row.getData();
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statement.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statement.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statement.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statement.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statement.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statement.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statement.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statement.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from JdbcResolver is corrupted");
    +            }
    +        }
    +    }
    +
    +    private static final Log LOG = LogFactory.getLog(JdbcResolver.class);
    +
    +    // SimpleDateFormat to parse TEXT into DATE
    +    private static ThreadLocal<SimpleDateFormat> dateSDF = new ThreadLocal<SimpleDateFormat>() {
    --- End diff --
    
    @denalex, I would be glad if you could tell about the details of a `ThreadLocal`'s lifecycle  in Tomcat. As far as I know (see https://docs.oracle.com/javase/7/docs/api/java/lang/ThreadLocal.html), every `ThreadLocal` object is a subject to garbage collection when a thread dies.


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by mebelousov <gi...@git.apache.org>.
Github user mebelousov commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    This is great feature.
    We really need to upload data from Greenplum to other databases.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r182613451
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    --- End diff --
    
    I am wondering how likely it might happen?


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by kapustor <gi...@git.apache.org>.
Github user kapustor commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    Hi @sansanichfb 
    
    We've removed the notification about the consistency and transactions from the error text.
    
    Thank you!


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214428410
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java ---
    @@ -75,16 +75,15 @@ public void setQuery(String query) {
         /**
          * Set batch size to use.
          *
    -     * @param batchSize < 0: Use batches of infinite size
    +     * @param batchSize = 0: Use batches of recommended size
          * @param batchSize = 1: Do not use batches
          * @param batchSize > 1: Use batches of the given size
    +     * @param batchSize < 0: Use batches of infinite size
          */
         public void setBatchSize(int batchSize) {
    -        if (batchSize < 0) {
    -            batchSize = 0;
    -        }
    -        else if (batchSize == 0) {
    -            batchSize = 1;
    +        if (batchSize == 0) {
    +            // Set the recommended value: https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
    +            batchSize = 100;
    --- End diff --
    
    This makes sense.
    Do you propose to use batches by default? Currently, `BATCH_SIZE` controls not only the size of a batch, but also whether it is used or not.
    Also, if we allow user not to use batches, we should make `1` (or some other value) a valid `BATCH_SIZE`.


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by deem0n <gi...@git.apache.org>.
Github user deem0n commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
     Yep, our customers have plenty different data sources, mostly JDBC compliant. This feature makes GPDB a very flexible and friendly player in corporate IT landscape.


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by kapustor <gi...@git.apache.org>.
Github user kapustor commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    Hi @sansanichfb!
    
    > Also, I am wondering, which target database patch was tested against? The transactional approach doesn't seem clear to me - if DB doesn't support transactions - you are still sending data in batches, which might leave target db in the non-consistent state since you can't rollback the last transaction. Should we reconsider this approach and send only one batch(not ideal though, maybe ask users to tweak partitioning) in that case?
    
    We have tested the patch with Oracle, Postgres, MSSQL and Ignite cluster targets - they all work fine, the perfomance is good enough (especially for Ignite - about 200 Mbyte/s for two-node Greenplum and Ignite clusters).
    If you know any other target that should be tested - pls let me know, I will do it.
    As for transactional approach - even if the target do support transactions, there still may be inconsistense in data, for example if one of the GP segements will fail to insert it's batch (other segements may succeed and only part of the data will be inserted). As for sending all the data in one batch - as I understand, such approach will limit the max data size by memory available for PXF service, because PXF have to aggregate the batch in its memory.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r182618597
  
    --- Diff: pxf/pxf-jdbc/README.md ---
    @@ -1,139 +1,235 @@
    -# Accessing Jdbc Table Data
    +# PXF JDBC plugin
     
    -The PXF JDBC plug-in reads data stored in Traditional relational database,ie : mysql,ORACLE,postgresql.
    +The PXF JDBC plugin allows to access external databases that implement [the Java Database Connectivity API](http://www.oracle.com/technetwork/java/javase/jdbc/index.html). Both read (SELECT) and write (INSERT) operations are supported by the plugin.
     
    -PXF-JDBC plug-in is the client of the database, the host running the database engine does not need to
    -deploy PXF.
    +PXF JDBC plugin is a JDBC client. The host running the external database does not need to deploy PXF.
     
     
    -# Prerequisites
    +## Prerequisites
     
    -Check the following before using PXF to access JDBC Table:
    -* The PXF JDBC plug-in is installed on all cluster nodes.
    -* The JDBC JAR files are installed on all cluster nodes, and added to file - 'pxf-public.classpath'
    -* You have tested PXF on HDFS.
    +Check the following before using the PXF JDBC plugin:
     
    -# Using PXF Tables to Query JDBC Table
    -Jdbc tables are defined in same schema in PXF.The PXF table has the same column name
    -as Jdbc Table, and the column type requires a mapping of Jdbc-HAWQ.
    +* The PXF JDBC plugin is installed on all PXF nodes;
    +* The JDBC driver for external database is installed on all PXF nodes;
    +* All PXF nodes are allowed to connect to the external database.
     
    -## Syntax Example
    -The following PXF table definition is valid for Jdbc Table.
     
    -    CREATE [READABLE|WRITABLE] EXTERNAL TABLE table_name
    -        ( column_name data_type [, ...] | LIKE other_table )
    -    LOCATION ('pxf://namenode[:port]/jdbc-schema-name.jdbc-table-name?<pxf-parameters><&custom-parameters>')
    -    FORMAT 'CUSTOM' (formatter='pxfwritable_import')
    -If `jdbc-schema-name` is omitted, pxf will default to the `default` schema.
    +## Limitations
     
    -The `column_name` must exists in jdbc-table,`data_type` equals or similar to
    -the jdbc-column type.
    +Both **PXF table** **and** a **table in external database** **must have the same definiton**. Their columns must have the same names, and the columns' types must correspond.
     
    -where `<pxf-parameters>` is:
    +**Not all data types are supported** by the plugin. The following PXF data types are supported:
     
    -    [FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
    -    &ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor
    -    &RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver]
    -    | PROFILE=Jdbc
    +* `INTEGER`, `BIGINT`, `SMALLINT`
    +* `REAL`, `FLOAT8`
    +* `NUMERIC`
    +* `BOOLEAN`
    +* `VARCHAR`, `BPCHAR`, `TEXT`
    +* `DATE`
    +* `TIMESTAMP`
    +* `BYTEA`
     
    -where `<custom-parameters>` is:
    +The `<full_external_table_name>` (see below) **must not match** the [pattern](https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html) `/.*/[0-9]*-[0-9]*_[0-9]*` (the name must not start with `/` and have an ending that consists of `/` and three groups of numbers of arbitrary length, the first two separated by `-` and the last two separated by `_`. For example, the following table name is not allowed: `/public.table/1-2_3`).
     
    -    JDBC_DRIVER=<jdbc-driver-class-name>
    -     &DB_URL=<jdbc-url>&USER=<database-user>&PASS=<password>
    +At the moment, one PXF external table cannot serve both SELECT and INSERT queries. A separate PXF external table is required for each type of queries.
     
    -## Jdbc Table to HAWQ Data Type Mapping
    -Jdbc-table and hawq-table data type system is similar to, does not require
    -a special type of mapping.
    -# Usage
    -The following to mysql, for example, describes the use of PDF-JDBC.
     
    -To query MySQL Table in HAWQ, perform the following steps:
    -1. create Table in MySQL
    +## Syntax
    +```
    +CREATE [ READABLE | WRITABLE ] EXTERNAL TABLE <table_name> (
    +    { <column_name> <data_type> [, ...] | LIKE <other_table> }
    +)
    +LOCATION (
    +    'pxf://<full_external_table_name>?<pxf_parameters><jdbc_required_parameters><jdbc_login_parameters><plugin_parameters>'
    +)
    +FORMAT 'CUSTOM' (FORMATTER={'pxfwritable_import' | 'pxfwritable_export'})
    +```
     
    -         mysql> use demodb;
    -         mysql> create table myclass(
    -                 id int(4) not null primary key,
    -                 name varchar(20) not null,
    -                 gender int(4) not null default '0',
    -                 degree double(16,2));`
    -2. insert test data
    +The **`<pxf_parameters>`** are:
    +```
    +{
    +PROFILE=JDBC
    +|
    +FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
    +&ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor
    +&RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcResolver
    +}
    +```
     
    -        insert into myclass values(1,"tom",1,90);
    -        insert into myclass values(2,'john',0,94);
    -        insert into myclass values(3,'simon',1,79);
    -3. copy mysql-jdbc jar files to `/usr/lib/pxf` (on all cluster nodes), and
    -edit `/etc/pxf/conf/pxf-public.classpath` , add :
    +The **`<jdbc_required_parameters>`** are:
    +```
    +&JDBC_DRIVER=<external_database_jdbc_driver>
    +&DB_URL=<external_database_url>
    +```
     
    -        /usr/lib/pxf/mysql-connector-java-*.jar
    +The **`<jdbc_login_parameters>`** are **optional**, but if provided, both of them must be present:
    +```
    +&USER=<external_database_login>
    +&PASS=<external_database_password>
    +```
     
    -     Restart all pxf-engine.
    +The **`<plugin_parameters>`** are **optional**.
     
    -4. create Table in HAWQ:
    +The meaning of `BATCH_SIZE` is given in section [batching of INSERT queries](#Batching).
     
    -        gpadmin=# CREATE EXTERNAL TABLE myclass(id integer,
    -             name text,
    -             gender integer,
    -             degree float8)
    -             LOCATION ('pxf://localhost:51200/demodb.myclass'
    -                     '?PROFILE=JDBC'
    -                     '&JDBC_DRIVER=com.mysql.jdbc.Driver'
    -                     '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
    -                     )
    -            FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
    +The meaning of other parameters is given in section [partitioning](#Partitioning).
    +```
    +{
    +&BATCH_SIZE=<batch_size>
    +|
    +&PARTITION_BY=<column>:<column_type>
    +&RANGE=<start_value>:<end_value>
    +[&INTERVAL=<value>[:<unit>]]
    +}
    +```
     
    -MySQL instance IP: 192.168.200.6, port: 3306.
     
    -5. query mysql data in HAWQ:
    +## SELECT queries
     
    -        gpadmin=# select * from myclass;
    -        gpadmin=# select * from myclass where id=2;
    +The PXF JDBC plugin allows to perform SELECT queries to external tables.
     
    -# Jdbc Table Fragments
    -## intro
    -PXF-JDBC plug-in as a  client to access jdbc database.By default, there is
    -only one pxf-instance connectied JDBC Table.If the jdbc table data is large,
    -you can also use multiple pxf-instance to access the JDBC table by fragments.
    +To perform SELECT queries, create an `EXTERNAL READABLE TABLE` or just an `EXTERNAL TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import')` in PXF.
     
    -## Syntax
    -where `<custom-parameters>` can use following partition parameters:
    -
    -    PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]
    -The `PARTITION_BY` parameter indicates which  column to use as the partition column.
    -It can be split by colon(':'),the `column_type` current supported : `date|int|enum` .
    -The Date format is `yyyy-MM-dd`.
    -The `PARTITION_BY` parameter can be null, and there will be only one fragment.
    -
    -The `RANGE` parameter indicates the range of data to be queried , it can be split by colon(':').
    - The range is left-closed, ie: `>= start_value AND < end_value` .
    -
    -The `INTERVAL` parameter can be split by colon(':'), indicate the interval
    - value of one fragment. When `column_type` is `date`,this parameter must
    - be split by colon, and `interval_unit` can be `year|month|day`. When
    - `column_type` is int, the `interval_unit` can be empty. When `column_type`
    - is enum,the `INTERVAL` parameter can be empty.
    -
    -The syntax examples is :
    -
    -    * PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'
    -    * PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1
    -    * PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad
    -
    -## Usage
    -MySQL Table:
    -
    -    CREATE TABLE sales (id int primary key, cdate date, amt decimal(10,2),grade varchar(30))
    -HAWQ Table:
    -
    -    CREATE EXTERNAL TABLE sales(id integer,
    -                 cdate date,
    -                 amt float8,
    -                 grade text)
    -                 LOCATION ('pxf://localhost:51200/sales'
    -                         '?PROFILE=JDBC'
    -                         '&JDBC_DRIVER=com.mysql.jdbc.Driver'
    -                         '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
    -                         '&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year'
    -                         )
    -                 FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
    -At PXF-JDBC plugin,this will generate 2 fragments.Then HAWQ assign these fragments to 2 PXF-instance
    -to access jdbc table data.
    \ No newline at end of file
    +The `BATCH_SIZE` parameter is *not used* in such tables. *However*, if this parameter is present, its value will be checked for correctness (it must be an integer).
    +
    +
    +## INSERT queries
    +
    +The PXF JDBC plugin allows to perform INSERT queries to external tables.
    +
    +To perform INSERT queries, create an `EXTERNAL WRITABLE TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')` in PXF.
    +
    +The `PARTITION_BY`, `RANGE` and `INTERVAL` is ignored in such tables.
    +
    +
    +### Batching
    +
    +The INSERT queries can be batched. This may significantly increase perfomance if batching is supported by the external database.
    +
    +If an **external database does not support transactions** and the INSERTion of one of the tuples failed, some tuples still may be inserted into the external database. The actual result depends on the external database.
    +
    +To enable batching, create an external table with the parameter `BATCH_SIZE` set to:
    +* `integer > 1`. Batches of the given size will be used;
    +* `integer < 0`. A batch of infinite size will be used (all tuples will be sent in one huge JDBC query). Note that this behaviour **may cause errors**, as each database has its own limit on the size of JDBC queries;
    +* `0` or `1`. Batching will not be used.
    +
    +Batching must be supported by the JDBC driver of an external database. If the driver does not support batching, it will not be used, but PXF plugin will try to INSERT values anyway, and an information message will be added to PXF logs.
    +
    +By default (`BATCH_SIZE` is absent), batching is not used.
    +
    +
    +## Partitioning
    +
    +PXF JDBC plugin supports simultaneous *read* access to an external table from multiple PXF segments. This function is called partitioning.
    +
    +
    +### Syntax
    +Use the following `<plugin_parameters>` (mentioned above) to activate partitioning:
    +
    +```
    +&PARTITION_BY=<column>:<column_type>
    +&RANGE=<start_value>:<end_value>
    +[&INTERVAL=<value>[:<unit>]]
    +```
    +
    +* The `PARTITION_BY` parameter indicates which column to use as a partition column. Only one column can be used as the partition column
    +    * The `<column>` is the name of a partition column;
    +    * The `<column_type>` is the data type of a partition column. At the moment, the **supported types** are `INT`, `DATE` and `ENUM`.
    +
    +* The `RANGE` parameter indicates the range of data to be queried.
    +    * If the partition type is `ENUM`, the `RANGE` parameter must be a list of values, each of which forms its own fragment;
    +    * If the partition type is `INT` or `DATE`, the `RANGE` parameter must be a finite left-closed range ( `... >= start_value AND ... < end_value`);
    +    * For `DATE` partitions, the date format must be `yyyy-MM-dd`.
    +
    +* The `INTERVAL` parameter is **required** for `INT` and `DATE` partitions. It is ignored if `<column_type>` is `ENUM`.
    +    * The `<value>` is the size of each fragment (the last one may be made smaller by the plugin);
    +    * The `<unit>` **must** be provided if `<column_type>` is `DATE`. `year`, `month` and `day` are supported. This parameter is ignored in case of any other `<column_type>`.
    +
    +Example partitions:
    +* `&PARTITION_BY=id:int&RANGE=42:142&INTERVAL=2`
    +* `&PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month`
    +* `&PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad`
    +
    +
    +### Mechanism
    +
    +Note that **by default PXF does not support more than 100 fragments**.
    --- End diff --
    
    Can you please explain where 100 comes from?


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by kapustor <gi...@git.apache.org>.
Github user kapustor commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    Hi @sansanichfb,
    
    Any news on this PR?


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r183218849
  
    --- Diff: pxf/pxf-jdbc/README.md ---
    @@ -1,139 +1,235 @@
    -# Accessing Jdbc Table Data
    +# PXF JDBC plugin
     
    -The PXF JDBC plug-in reads data stored in Traditional relational database,ie : mysql,ORACLE,postgresql.
    +The PXF JDBC plugin allows to access external databases that implement [the Java Database Connectivity API](http://www.oracle.com/technetwork/java/javase/jdbc/index.html). Both read (SELECT) and write (INSERT) operations are supported by the plugin.
     
    -PXF-JDBC plug-in is the client of the database, the host running the database engine does not need to
    -deploy PXF.
    +PXF JDBC plugin is a JDBC client. The host running the external database does not need to deploy PXF.
     
     
    -# Prerequisites
    +## Prerequisites
     
    -Check the following before using PXF to access JDBC Table:
    -* The PXF JDBC plug-in is installed on all cluster nodes.
    -* The JDBC JAR files are installed on all cluster nodes, and added to file - 'pxf-public.classpath'
    -* You have tested PXF on HDFS.
    +Check the following before using the PXF JDBC plugin:
     
    -# Using PXF Tables to Query JDBC Table
    -Jdbc tables are defined in same schema in PXF.The PXF table has the same column name
    -as Jdbc Table, and the column type requires a mapping of Jdbc-HAWQ.
    +* The PXF JDBC plugin is installed on all PXF nodes;
    +* The JDBC driver for external database is installed on all PXF nodes;
    +* All PXF nodes are allowed to connect to the external database.
     
    -## Syntax Example
    -The following PXF table definition is valid for Jdbc Table.
     
    -    CREATE [READABLE|WRITABLE] EXTERNAL TABLE table_name
    -        ( column_name data_type [, ...] | LIKE other_table )
    -    LOCATION ('pxf://namenode[:port]/jdbc-schema-name.jdbc-table-name?<pxf-parameters><&custom-parameters>')
    -    FORMAT 'CUSTOM' (formatter='pxfwritable_import')
    -If `jdbc-schema-name` is omitted, pxf will default to the `default` schema.
    +## Limitations
     
    -The `column_name` must exists in jdbc-table,`data_type` equals or similar to
    -the jdbc-column type.
    +Both **PXF table** **and** a **table in external database** **must have the same definiton**. Their columns must have the same names, and the columns' types must correspond.
     
    -where `<pxf-parameters>` is:
    +**Not all data types are supported** by the plugin. The following PXF data types are supported:
     
    -    [FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
    -    &ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor
    -    &RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver]
    -    | PROFILE=Jdbc
    +* `INTEGER`, `BIGINT`, `SMALLINT`
    +* `REAL`, `FLOAT8`
    +* `NUMERIC`
    +* `BOOLEAN`
    +* `VARCHAR`, `BPCHAR`, `TEXT`
    +* `DATE`
    +* `TIMESTAMP`
    +* `BYTEA`
     
    -where `<custom-parameters>` is:
    +The `<full_external_table_name>` (see below) **must not match** the [pattern](https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html) `/.*/[0-9]*-[0-9]*_[0-9]*` (the name must not start with `/` and have an ending that consists of `/` and three groups of numbers of arbitrary length, the first two separated by `-` and the last two separated by `_`. For example, the following table name is not allowed: `/public.table/1-2_3`).
     
    -    JDBC_DRIVER=<jdbc-driver-class-name>
    -     &DB_URL=<jdbc-url>&USER=<database-user>&PASS=<password>
    +At the moment, one PXF external table cannot serve both SELECT and INSERT queries. A separate PXF external table is required for each type of queries.
     
    -## Jdbc Table to HAWQ Data Type Mapping
    -Jdbc-table and hawq-table data type system is similar to, does not require
    -a special type of mapping.
    -# Usage
    -The following to mysql, for example, describes the use of PDF-JDBC.
     
    -To query MySQL Table in HAWQ, perform the following steps:
    -1. create Table in MySQL
    +## Syntax
    +```
    +CREATE [ READABLE | WRITABLE ] EXTERNAL TABLE <table_name> (
    +    { <column_name> <data_type> [, ...] | LIKE <other_table> }
    +)
    +LOCATION (
    +    'pxf://<full_external_table_name>?<pxf_parameters><jdbc_required_parameters><jdbc_login_parameters><plugin_parameters>'
    +)
    +FORMAT 'CUSTOM' (FORMATTER={'pxfwritable_import' | 'pxfwritable_export'})
    +```
     
    -         mysql> use demodb;
    -         mysql> create table myclass(
    -                 id int(4) not null primary key,
    -                 name varchar(20) not null,
    -                 gender int(4) not null default '0',
    -                 degree double(16,2));`
    -2. insert test data
    +The **`<pxf_parameters>`** are:
    +```
    +{
    +PROFILE=JDBC
    +|
    +FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
    +&ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor
    +&RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcResolver
    +}
    +```
     
    -        insert into myclass values(1,"tom",1,90);
    -        insert into myclass values(2,'john',0,94);
    -        insert into myclass values(3,'simon',1,79);
    -3. copy mysql-jdbc jar files to `/usr/lib/pxf` (on all cluster nodes), and
    -edit `/etc/pxf/conf/pxf-public.classpath` , add :
    +The **`<jdbc_required_parameters>`** are:
    +```
    +&JDBC_DRIVER=<external_database_jdbc_driver>
    +&DB_URL=<external_database_url>
    +```
     
    -        /usr/lib/pxf/mysql-connector-java-*.jar
    +The **`<jdbc_login_parameters>`** are **optional**, but if provided, both of them must be present:
    +```
    +&USER=<external_database_login>
    +&PASS=<external_database_password>
    +```
     
    -     Restart all pxf-engine.
    +The **`<plugin_parameters>`** are **optional**.
     
    -4. create Table in HAWQ:
    +The meaning of `BATCH_SIZE` is given in section [batching of INSERT queries](#Batching).
     
    -        gpadmin=# CREATE EXTERNAL TABLE myclass(id integer,
    -             name text,
    -             gender integer,
    -             degree float8)
    -             LOCATION ('pxf://localhost:51200/demodb.myclass'
    -                     '?PROFILE=JDBC'
    -                     '&JDBC_DRIVER=com.mysql.jdbc.Driver'
    -                     '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
    -                     )
    -            FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
    +The meaning of other parameters is given in section [partitioning](#Partitioning).
    +```
    +{
    +&BATCH_SIZE=<batch_size>
    +|
    +&PARTITION_BY=<column>:<column_type>
    +&RANGE=<start_value>:<end_value>
    +[&INTERVAL=<value>[:<unit>]]
    +}
    +```
     
    -MySQL instance IP: 192.168.200.6, port: 3306.
     
    -5. query mysql data in HAWQ:
    +## SELECT queries
     
    -        gpadmin=# select * from myclass;
    -        gpadmin=# select * from myclass where id=2;
    +The PXF JDBC plugin allows to perform SELECT queries to external tables.
     
    -# Jdbc Table Fragments
    -## intro
    -PXF-JDBC plug-in as a  client to access jdbc database.By default, there is
    -only one pxf-instance connectied JDBC Table.If the jdbc table data is large,
    -you can also use multiple pxf-instance to access the JDBC table by fragments.
    +To perform SELECT queries, create an `EXTERNAL READABLE TABLE` or just an `EXTERNAL TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import')` in PXF.
     
    -## Syntax
    -where `<custom-parameters>` can use following partition parameters:
    -
    -    PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]
    -The `PARTITION_BY` parameter indicates which  column to use as the partition column.
    -It can be split by colon(':'),the `column_type` current supported : `date|int|enum` .
    -The Date format is `yyyy-MM-dd`.
    -The `PARTITION_BY` parameter can be null, and there will be only one fragment.
    -
    -The `RANGE` parameter indicates the range of data to be queried , it can be split by colon(':').
    - The range is left-closed, ie: `>= start_value AND < end_value` .
    -
    -The `INTERVAL` parameter can be split by colon(':'), indicate the interval
    - value of one fragment. When `column_type` is `date`,this parameter must
    - be split by colon, and `interval_unit` can be `year|month|day`. When
    - `column_type` is int, the `interval_unit` can be empty. When `column_type`
    - is enum,the `INTERVAL` parameter can be empty.
    -
    -The syntax examples is :
    -
    -    * PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'
    -    * PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1
    -    * PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad
    -
    -## Usage
    -MySQL Table:
    -
    -    CREATE TABLE sales (id int primary key, cdate date, amt decimal(10,2),grade varchar(30))
    -HAWQ Table:
    -
    -    CREATE EXTERNAL TABLE sales(id integer,
    -                 cdate date,
    -                 amt float8,
    -                 grade text)
    -                 LOCATION ('pxf://localhost:51200/sales'
    -                         '?PROFILE=JDBC'
    -                         '&JDBC_DRIVER=com.mysql.jdbc.Driver'
    -                         '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
    -                         '&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year'
    -                         )
    -                 FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
    -At PXF-JDBC plugin,this will generate 2 fragments.Then HAWQ assign these fragments to 2 PXF-instance
    -to access jdbc table data.
    \ No newline at end of file
    +The `BATCH_SIZE` parameter is *not used* in such tables. *However*, if this parameter is present, its value will be checked for correctness (it must be an integer).
    +
    +
    +## INSERT queries
    +
    +The PXF JDBC plugin allows to perform INSERT queries to external tables.
    +
    +To perform INSERT queries, create an `EXTERNAL WRITABLE TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')` in PXF.
    +
    +The `PARTITION_BY`, `RANGE` and `INTERVAL` is ignored in such tables.
    +
    +
    +### Batching
    +
    +The INSERT queries can be batched. This may significantly increase perfomance if batching is supported by the external database.
    +
    +If an **external database does not support transactions** and the INSERTion of one of the tuples failed, some tuples still may be inserted into the external database. The actual result depends on the external database.
    +
    +To enable batching, create an external table with the parameter `BATCH_SIZE` set to:
    +* `integer > 1`. Batches of the given size will be used;
    +* `integer < 0`. A batch of infinite size will be used (all tuples will be sent in one huge JDBC query). Note that this behaviour **may cause errors**, as each database has its own limit on the size of JDBC queries;
    +* `0` or `1`. Batching will not be used.
    +
    +Batching must be supported by the JDBC driver of an external database. If the driver does not support batching, it will not be used, but PXF plugin will try to INSERT values anyway, and an information message will be added to PXF logs.
    +
    +By default (`BATCH_SIZE` is absent), batching is not used.
    +
    +
    +## Partitioning
    +
    +PXF JDBC plugin supports simultaneous *read* access to an external table from multiple PXF segments. This function is called partitioning.
    +
    +
    +### Syntax
    +Use the following `<plugin_parameters>` (mentioned above) to activate partitioning:
    +
    +```
    +&PARTITION_BY=<column>:<column_type>
    +&RANGE=<start_value>:<end_value>
    +[&INTERVAL=<value>[:<unit>]]
    +```
    +
    +* The `PARTITION_BY` parameter indicates which column to use as a partition column. Only one column can be used as the partition column
    +    * The `<column>` is the name of a partition column;
    +    * The `<column_type>` is the data type of a partition column. At the moment, the **supported types** are `INT`, `DATE` and `ENUM`.
    +
    +* The `RANGE` parameter indicates the range of data to be queried.
    +    * If the partition type is `ENUM`, the `RANGE` parameter must be a list of values, each of which forms its own fragment;
    +    * If the partition type is `INT` or `DATE`, the `RANGE` parameter must be a finite left-closed range ( `... >= start_value AND ... < end_value`);
    +    * For `DATE` partitions, the date format must be `yyyy-MM-dd`.
    +
    +* The `INTERVAL` parameter is **required** for `INT` and `DATE` partitions. It is ignored if `<column_type>` is `ENUM`.
    +    * The `<value>` is the size of each fragment (the last one may be made smaller by the plugin);
    +    * The `<unit>` **must** be provided if `<column_type>` is `DATE`. `year`, `month` and `day` are supported. This parameter is ignored in case of any other `<column_type>`.
    +
    +Example partitions:
    +* `&PARTITION_BY=id:int&RANGE=42:142&INTERVAL=2`
    +* `&PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month`
    +* `&PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad`
    +
    +
    +### Mechanism
    +
    +Note that **by default PXF does not support more than 100 fragments**.
    --- End diff --
    
    This was incorrect. I have tested this case; the mentioned limitation does not exist.
    
    Many thanks for this comment!


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214452070
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java ---
    @@ -0,0 +1,364 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadResolver;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.WriteResolver;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +import java.text.ParseException;
    +import java.text.SimpleDateFormat;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables resolver
    + */
    +public class JdbcResolver extends JdbcPlugin implements ReadResolver, WriteResolver {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcResolver(InputData input) throws UserDataException {
    +        super(input);
    +    }
    +
    +    /**
    +     * getFields() implementation
    +     *
    +     * @throws SQLException if the provided {@link OneRow} object is invalid
    +     */
    +    @Override
    +    public List<OneField> getFields(OneRow row) throws SQLException {
    +        ResultSet result = (ResultSet) row.getData();
    +        LinkedList<OneField> fields = new LinkedList<>();
    +
    +        for (ColumnDescriptor column : columns) {
    +            String colName = column.columnName();
    +            Object value = null;
    +
    +            OneField oneField = new OneField();
    +            oneField.type = column.columnTypeCode();
    +
    +            switch (DataType.get(oneField.type)) {
    +                case INTEGER:
    +                    value = result.getInt(colName);
    +                    break;
    +                case FLOAT8:
    +                    value = result.getDouble(colName);
    +                    break;
    +                case REAL:
    +                    value = result.getFloat(colName);
    +                    break;
    +                case BIGINT:
    +                    value = result.getLong(colName);
    +                    break;
    +                case SMALLINT:
    +                    value = result.getShort(colName);
    +                    break;
    +                case BOOLEAN:
    +                    value = result.getBoolean(colName);
    +                    break;
    +                case BYTEA:
    +                    value = result.getBytes(colName);
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case NUMERIC:
    +                    value = result.getString(colName);
    +                    break;
    +                case DATE:
    +                    value = result.getDate(colName);
    +                    break;
    +                case TIMESTAMP:
    +                    value = result.getTimestamp(colName);
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +            }
    +
    +            oneField.val = value;
    +            fields.add(oneField);
    +        }
    +        return fields;
    +    }
    +
    +    /**
    +     * setFields() implementation
    +     *
    +     * @return OneRow with the data field containing a List<OneField>
    +     * OneFields are not reordered before being passed to Accessor; at the
    +     * moment, there is no way to correct the order of the fields if it is not.
    +     * In practice, the 'record' provided is always ordered the right way.
    +     *
    +     * @throws UnsupportedOperationException if field of some type is not supported
    +     */
    +    @Override
    +    public OneRow setFields(List<OneField> record) throws UnsupportedOperationException, ParseException {
    +        int column_index = 0;
    +        for (OneField oneField : record) {
    +            ColumnDescriptor column = columns.get(column_index);
    +            if (
    +                LOG.isDebugEnabled() &&
    +                DataType.get(column.columnTypeCode()) != DataType.get(oneField.type)
    +            ) {
    +                LOG.warn("The provided tuple of data may be disordered. Datatype of column with descriptor '" + column.toString() + "' must be '" + DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + DataType.get(oneField.type).toString() + "'");
    +            }
    +
    +            // Check that data type is supported
    +            switch (DataType.get(oneField.type)) {
    +                case BOOLEAN:
    +                case INTEGER:
    +                case FLOAT8:
    +                case REAL:
    +                case BIGINT:
    +                case SMALLINT:
    +                case NUMERIC:
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case BYTEA:
    +                case TIMESTAMP:
    +                case DATE:
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +            }
    +
    +            if (LOG.isDebugEnabled()) {
    +                if (DataType.get(oneField.type) == DataType.BYTEA) {
    +                    LOG.debug("OneField content (conversion from BYTEA): '" + new String((byte[])oneField.val) + "'");
    +                }
    +            }
    +
    +            // Convert TEXT columns into native data types
    +            if ((oneField.val != null) && (DataType.get(oneField.type) == DataType.TEXT) && (DataType.get(column.columnTypeCode()) != DataType.TEXT)) {
    +                String rawVal = (String)oneField.val;
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("OneField content (conversion from TEXT): '" + rawVal + "'");
    +                }
    +                switch (DataType.get(column.columnTypeCode())) {
    +                    case VARCHAR:
    +                    case BPCHAR:
    +                    case TEXT:
    +                    case BYTEA:
    +                        break;
    +                    case BOOLEAN:
    +                        oneField.val = (Object)Boolean.parseBoolean(rawVal);
    +                        break;
    +                    case INTEGER:
    +                        oneField.val = (Object)Integer.parseInt(rawVal);
    +                        break;
    +                    case FLOAT8:
    +                        oneField.val = (Object)Double.parseDouble(rawVal);
    +                        break;
    +                    case REAL:
    +                        oneField.val = (Object)Float.parseFloat(rawVal);
    +                        break;
    +                    case BIGINT:
    +                        oneField.val = (Object)Long.parseLong(rawVal);
    +                        break;
    +                    case SMALLINT:
    +                        oneField.val = (Object)Short.parseShort(rawVal);
    +                        break;
    +                    case NUMERIC:
    +                        oneField.val = (Object)new BigDecimal(rawVal);
    +                        break;
    +                    case TIMESTAMP:
    +                        boolean isConversionSuccessful = false;
    +                        for (SimpleDateFormat sdf : timestampSDFs.get()) {
    +                            try {
    +                                java.util.Date parsedTimestamp = sdf.parse(rawVal);
    +                                oneField.val = (Object)new Timestamp(parsedTimestamp.getTime());
    +                                isConversionSuccessful = true;
    +                                break;
    +                            }
    +                            catch (ParseException e) {
    +                                // pass
    +                            }
    +                        }
    +                        if (!isConversionSuccessful) {
    +                            throw new ParseException(rawVal, 0);
    +                        }
    +                        break;
    +                    case DATE:
    +                        oneField.val = (Object)new Date(dateSDF.get().parse(rawVal).getTime());
    +                        break;
    +                    default:
    +                        throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
    +                }
    +                oneField.type = column.columnTypeCode();
    +            }
    +
    +            column_index += 1;
    +        }
    +        return new OneRow(new LinkedList<OneField>(record));
    +    }
    +
    +    /**
    +     * Decode OneRow object and pass all its contents to a PreparedStatement
    +     *
    +     * @throws IOException if data in a OneRow is corrupted
    +     * @throws SQLException if the given statement is broken
    +     */
    +    @SuppressWarnings("unchecked")
    +    public static void decodeOneRowToPreparedStatement(OneRow row, PreparedStatement statement) throws IOException, SQLException {
    +        // This is safe: OneRow comes from JdbcResolver
    +        List<OneField> tuple = (List<OneField>)row.getData();
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statement.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statement.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statement.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statement.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statement.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statement.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statement.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statement.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from JdbcResolver is corrupted");
    +            }
    +        }
    +    }
    +
    +    private static final Log LOG = LogFactory.getLog(JdbcResolver.class);
    +
    +    // SimpleDateFormat to parse TEXT into DATE
    +    private static ThreadLocal<SimpleDateFormat> dateSDF = new ThreadLocal<SimpleDateFormat>() {
    --- End diff --
    
    thanks, right, looking more into details of ThreadLocal implementation, it seems there should be no issue with GC when Thread finishes. We also don't store request-specific state there, so there is no concern of cleaning it when thread is returned to the pool. So we can consider this question closed, and we can move to thread-safe formatters as further improvement later on.


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    @kapustor we can't fail safely when something wrong happens with at least one fragment, without dealing with the complexity of distributed transactions.
    So transaction handling doesn't look necessary in this PR.
    I would recommend to clearly state in the documents, that JDBC plugin doesn't guarantee a consistent outcome.
    Furthermore, we should set clear expectations for users and ask them to use staging tables for inserting data from Greenplum. It would allow us to simplify code and get rid of cumbersome rollback logic.
    
    WRT:
    >  As for sending all the data in one batch - as I understand, such approach will limit the max data size by memory available for PXF service, because PXF have to aggregate the batch in its memory.
    - this is not exactly accurate since PXF is designed to be a lightweight layer between external storage and consumer thus no aggregation in memory is needed(except some reasonable buffering, depending on I/O and network sockets).


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r193735632
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +        if (dbMeta.supportsTransactions()) {
    +            dbConn.setAutoCommit(false);
    +        }
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = dbConn.prepareStatement(queryWrite);
    +
    +        if ((batchSize != 0) && (!dbMeta.supportsBatchUpdates())) {
    +            LOG.info(
    --- End diff --
    
    Fixed: https://github.com/apache/incubator-hawq/pull/1353/commits/7fe4cac858a64ed5260d0e86342980f355fe7eb1#diff-6829beb5b62de0a4880b5d5dbebca91aR139


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214145568
  
    --- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
    @@ -177,11 +177,11 @@ under the License.
         </profile>
         <profile>
             <name>Jdbc</name>
    -        <description>A profile for reading data into HAWQ via JDBC</description>
    +        <description>A profile to access (read & write) data via JDBC</description>
    --- End diff --
    
    this is supposed to be user readable, rather than machine readable, just substitute with word "and" to avoid issues.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214149512
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java ---
    @@ -0,0 +1,113 @@
    +package org.apache.hawq.pxf.plugins.jdbc.writercallable;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +
    +/**
    + * This writer makes batch INSERTs.
    + *
    + * A call() is required after a certain number of supply() calls
    + */
    +class BatchWriterCallable implements WriterCallable {
    +    @Override
    +    public void supply(OneRow row) throws IllegalStateException {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
    +        }
    +        if (row == null) {
    +            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
    +        }
    +        rows.add(row);
    +    }
    +
    +    @Override
    +    public boolean isCallRequired() {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    @Override
    +    public SQLException call() throws IOException, SQLException, ClassNotFoundException {
    +        if (rows.isEmpty()) {
    +            return null;
    +        }
    +
    +        boolean statementMustBeDeleted = false;
    +        if (statement == null) {
    +            statement = plugin.getPreparedStatement(plugin.getConnection(), query);
    +            statementMustBeDeleted = true;
    +        }
    +
    +        for (OneRow row : rows) {
    +            JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
    +            statement.addBatch();
    +        }
    +
    +        try {
    +            statement.executeBatch();
    +        }
    +        catch (SQLException e) {
    +            return e;
    +        }
    +        finally {
    +            rows.clear();
    +            if (statementMustBeDeleted) {
    +                JdbcPlugin.closeStatement(statement);
    +                statement = null;
    +            }
    +        }
    +
    +        return null;
    +    }
    +
    +    /**
    +     * Construct a new batch writer
    +     */
    +    BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement, int maxRowsCount) throws IllegalArgumentException {
    +        if ((plugin == null) || (query == null)) {
    +            throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null");
    +        }
    +        this.plugin = plugin;
    +        this.query = query;
    +        this.statement = statement;
    +        if (maxRowsCount < 0) {
    +            maxRowsCount = 0;
    --- End diff --
    
    unlimited batch size might result in PXF running out of memory if the user forgot to set batch size in the table definition. For example, on a small cluster with 5 nodes and 5 PXF instances, exporting 50G dataset from the cluster via PXF might result in 10G+ data per PXF instance and by default PXF is only given 2G of memory.
    
    going with smaller default batch size will ensure batching still works on systems with default memory setting (2G) . Users will redefine the batch size to achieve best performance, if needed.
    
    Oracle recommends setting batch size to 50-100 (https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754), so maybe actually 100 is a better safe default number ?


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r193736091
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +        if (dbMeta.supportsTransactions()) {
    +            dbConn.setAutoCommit(false);
    +        }
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = dbConn.prepareStatement(queryWrite);
    +
    +        if ((batchSize != 0) && (!dbMeta.supportsBatchUpdates())) {
    +            LOG.info(
    +                "The database '" +
    +                dbMeta.getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     */
    +    @Override
    +    @SuppressWarnings("unchecked")
    +    public boolean writeNextObject(OneRow row) throws SQLException, IOException {
    +        // This cast is safe because the data in the row is formed by JdbcPlugin
    +        List<OneField> tuple = (List<OneField>) row.getData();
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("writeNextObject() called");
    +        }
    +
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Field " + i + ": " + DataType.get(field.type).toString());
    +            }
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statementWrite.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statementWrite.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statementWrite.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statementWrite.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statementWrite.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statementWrite.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statementWrite.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statementWrite.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from JdbcResolver is corrupted");
    +            }
    +        }
    +
    +        boolean rollbackRequired = false;
    +        SQLException rollbackException = null;
    +        if (batchSize < 0) {
    +            // Batch has an infinite size
    +            statementWrite.addBatch();
    +        }
    +        else if ((batchSize == 0) || (batchSize == 1)) {
    +            // Batching is not used
    +            try {
    +                rollbackRequired = (statementWrite.executeUpdate() != 1);
    +                if (rollbackRequired) {
    +                    rollbackException = new SQLException("The number of rows affected by INSERT query is incorrect");
    +                }
    +            }
    +            catch (SQLException e) {
    +                rollbackRequired = true;
    +                rollbackException = e;
    +            }
    +        }
    +        else {
    +            // Batch has a finite size
    +            statementWrite.addBatch();
    +            batchSizeCurrent += 1;
    +            if (batchSizeCurrent >= batchSize) {
    +                batchSizeCurrent = 0;
    +                try {
    +                    statementWrite.executeBatch();
    +                    statementWrite.clearBatch();
    +                }
    +                catch (SQLException e) {
    +                    rollbackRequired = true;
    +                    rollbackException = e;
    +                }
    +            }
    +        }
    +
    +        if (rollbackRequired) {
    +            rollbackException = tryRollback(rollbackException);
    +            throw rollbackException;
    +        }
    +
    +        return true;
    +    }
    +
    +    /**
    +     * closeForWrite() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForWrite() throws SQLException {
    +        try {
    +            if ((dbConn != null) && (statementWrite != null) && (!statementWrite.isClosed())) {
    +                // If batching was used, execute the batch
    +                if ((batchSize < 0) || ((batchSize > 1) && (batchSizeCurrent > 0))) {
    +                    try {
    +                        statementWrite.executeBatch();
    +                    }
    +                    catch (SQLException e) {
    +                        e = tryRollback(e);
    +                        throw e;
    +                    }
    +                }
    +                if (dbMeta.supportsTransactions()) {
    +                    dbConn.commit();
    +                }
    +            }
    +        }
    +        finally {
    +            super.closeConnection();
    +        }
    +    }
    +
    +
    +    /**
    +     * Build SELECT query (with "WHERE" and partition constraints)
    +     *
    +     * @return Complete SQL query
    +     *
    +     * @throws ParseException if the constraints passed in InputData are incorrect
    +     * @throws SQLException if the database metadata is invalid
    +     */
    +    private String buildSelectQuery() throws ParseException, SQLException {
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("SELECT ");
    +
    +        // Insert columns' names
    +        String columnDivisor = "";
    +        for (ColumnDescriptor column : columns) {
    +            sb.append(columnDivisor);
    +            columnDivisor = ", ";
    +            sb.append(column.columnName());
    +        }
    +
    +        // Insert the table name
    +        sb.append(" FROM ").append(tableName);
    +
    +        // Insert regular WHERE constraints
    +        (new WhereSQLBuilder(inputData)).buildWhereSQL(dbMeta.getDatabaseProductName(), sb);
    +
    +        // Insert partition constraints
    +        JdbcPartitionFragmenter.buildFragmenterSql(inputData, dbMeta.getDatabaseProductName(), sb);
    +
    +        return sb.toString();
    +    }
    +
    +    /**
    +     * Build INSERT query template (field values are replaced by placeholders '?')
    +     *
    +     * @return SQL query with placeholders instead of actual values
    +     */
    +    private String buildInsertQuery() {
    +        StringBuilder sb = new StringBuilder();
    +
    +        sb.append("INSERT INTO ");
    +
    +        // Insert the table name
    +        sb.append(tableName);
    +
    +        // Insert columns' names
    +        sb.append("(");
    +        String fieldDivisor = "";
    +        for (ColumnDescriptor column : columns) {
    +            sb.append(fieldDivisor);
    +            fieldDivisor = ", ";
    +            sb.append(column.columnName());
    +        }
    +        sb.append(")");
    +
    +        sb.append(" VALUES ");
    +
    +        // Insert values placeholders
    +        sb.append("(");
    +        fieldDivisor = "";
    +        for (int i = 0; i < columns.size(); i++) {
    +            sb.append(fieldDivisor);
    +            fieldDivisor = ", ";
    +            sb.append("?");
    +        }
    +        sb.append(")");
    +
    +        return sb.toString();
    +    }
    +
    +    /**
    +     * Try to rollback the current transaction
    +     *
    +     * @param rollbackException the rollback cause
    +     *
    +     * @return The resulting SQLException (it may differ from 'rollbackException')
    +     */
    +    private SQLException tryRollback(SQLException rollbackException) {
    +        // Get transactions support status
    +        boolean areTransactionsSupported = false;
    +        try {
    +            areTransactionsSupported = dbMeta.supportsTransactions();
    +        }
    +        catch (SQLException e) {
    +            // This error is unexpected; the dbMeta is probably corrupted
    +            return rollbackException;
    +        }
    +
    +        if (!areTransactionsSupported) {
    +            // The bad case: transactions are NOT supported
    +            if ((batchSize < 0) || (batchSize > 1)) {
    +                // The worst case: batching was used
    +                return new SQLException(failedInsertMessage + "The exact number of tuples inserted is unknown.", rollbackException);
    +            }
    +            // Batching was NOT used
    +            return new SQLException(failedInsertMessage + "The exact number of tuples inserted can be found in PXF logs.", rollbackException);
    +        }
    +
    +        // The best case: transactions are supported
    +        try {
    +            dbConn.rollback();
    +        }
    +        catch (SQLException e) {
    +            // log exception as error, but do not throw it (the actual cause of rollback will be thrown instead)
    +            LOG.error("An exception happened during the transaction rollback: '" + e.toString() + "'");
    +        }
    +        return rollbackException;
    +    }
    +
    +
    +    private static final Log LOG = LogFactory.getLog(JdbcAccessor.class);
    +
    +    private static final String failedInsertMessage = "Insert failed due to an SQLException. The target database does not support transactions and SOME DATA MAY HAVE BEEN INSERTED. ";
    --- End diff --
    
    Fixed: https://github.com/apache/incubator-hawq/pull/1353/commits/7fe4cac858a64ed5260d0e86342980f355fe7eb1#diff-6829beb5b62de0a4880b5d5dbebca91aR458


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r182616117
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +        if (dbMeta.supportsTransactions()) {
    +            dbConn.setAutoCommit(false);
    +        }
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = dbConn.prepareStatement(queryWrite);
    +
    +        if ((batchSize != 0) && (!dbMeta.supportsBatchUpdates())) {
    +            LOG.info(
    +                "The database '" +
    +                dbMeta.getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     */
    +    @Override
    +    @SuppressWarnings("unchecked")
    +    public boolean writeNextObject(OneRow row) throws SQLException, IOException {
    +        // This cast is safe because the data in the row is formed by JdbcPlugin
    +        List<OneField> tuple = (List<OneField>) row.getData();
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("writeNextObject() called");
    +        }
    +
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Field " + i + ": " + DataType.get(field.type).toString());
    +            }
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statementWrite.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statementWrite.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statementWrite.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statementWrite.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statementWrite.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statementWrite.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statementWrite.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statementWrite.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from JdbcResolver is corrupted");
    +            }
    +        }
    +
    +        boolean rollbackRequired = false;
    +        SQLException rollbackException = null;
    +        if (batchSize < 0) {
    +            // Batch has an infinite size
    +            statementWrite.addBatch();
    +        }
    +        else if ((batchSize == 0) || (batchSize == 1)) {
    +            // Batching is not used
    +            try {
    +                rollbackRequired = (statementWrite.executeUpdate() != 1);
    +                if (rollbackRequired) {
    +                    rollbackException = new SQLException("The number of rows affected by INSERT query is incorrect");
    +                }
    +            }
    +            catch (SQLException e) {
    +                rollbackRequired = true;
    +                rollbackException = e;
    +            }
    +        }
    +        else {
    +            // Batch has a finite size
    +            statementWrite.addBatch();
    +            batchSizeCurrent += 1;
    +            if (batchSizeCurrent >= batchSize) {
    +                batchSizeCurrent = 0;
    +                try {
    +                    statementWrite.executeBatch();
    +                    statementWrite.clearBatch();
    +                }
    +                catch (SQLException e) {
    +                    rollbackRequired = true;
    +                    rollbackException = e;
    +                }
    +            }
    +        }
    +
    +        if (rollbackRequired) {
    +            rollbackException = tryRollback(rollbackException);
    +            throw rollbackException;
    +        }
    +
    +        return true;
    +    }
    +
    +    /**
    +     * closeForWrite() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForWrite() throws SQLException {
    +        try {
    +            if ((dbConn != null) && (statementWrite != null) && (!statementWrite.isClosed())) {
    +                // If batching was used, execute the batch
    +                if ((batchSize < 0) || ((batchSize > 1) && (batchSizeCurrent > 0))) {
    +                    try {
    +                        statementWrite.executeBatch();
    +                    }
    +                    catch (SQLException e) {
    +                        e = tryRollback(e);
    +                        throw e;
    +                    }
    +                }
    +                if (dbMeta.supportsTransactions()) {
    +                    dbConn.commit();
    +                }
    +            }
    +        }
    +        finally {
    +            super.closeConnection();
    +        }
    +    }
    +
    +
    +    /**
    +     * Build SELECT query (with "WHERE" and partition constraints)
    +     *
    +     * @return Complete SQL query
    +     *
    +     * @throws ParseException if the constraints passed in InputData are incorrect
    +     * @throws SQLException if the database metadata is invalid
    +     */
    +    private String buildSelectQuery() throws ParseException, SQLException {
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("SELECT ");
    +
    +        // Insert columns' names
    +        String columnDivisor = "";
    +        for (ColumnDescriptor column : columns) {
    +            sb.append(columnDivisor);
    +            columnDivisor = ", ";
    +            sb.append(column.columnName());
    +        }
    +
    +        // Insert the table name
    +        sb.append(" FROM ").append(tableName);
    +
    +        // Insert regular WHERE constraints
    +        (new WhereSQLBuilder(inputData)).buildWhereSQL(dbMeta.getDatabaseProductName(), sb);
    +
    +        // Insert partition constraints
    +        JdbcPartitionFragmenter.buildFragmenterSql(inputData, dbMeta.getDatabaseProductName(), sb);
    +
    +        return sb.toString();
    +    }
    +
    +    /**
    +     * Build INSERT query template (field values are replaced by placeholders '?')
    +     *
    +     * @return SQL query with placeholders instead of actual values
    +     */
    +    private String buildInsertQuery() {
    +        StringBuilder sb = new StringBuilder();
    +
    +        sb.append("INSERT INTO ");
    +
    +        // Insert the table name
    +        sb.append(tableName);
    +
    +        // Insert columns' names
    +        sb.append("(");
    +        String fieldDivisor = "";
    +        for (ColumnDescriptor column : columns) {
    +            sb.append(fieldDivisor);
    +            fieldDivisor = ", ";
    +            sb.append(column.columnName());
    +        }
    +        sb.append(")");
    +
    +        sb.append(" VALUES ");
    +
    +        // Insert values placeholders
    +        sb.append("(");
    +        fieldDivisor = "";
    +        for (int i = 0; i < columns.size(); i++) {
    +            sb.append(fieldDivisor);
    +            fieldDivisor = ", ";
    +            sb.append("?");
    +        }
    +        sb.append(")");
    +
    +        return sb.toString();
    +    }
    +
    +    /**
    +     * Try to rollback the current transaction
    +     *
    +     * @param rollbackException the rollback cause
    +     *
    +     * @return The resulting SQLException (it may differ from 'rollbackException')
    +     */
    +    private SQLException tryRollback(SQLException rollbackException) {
    +        // Get transactions support status
    +        boolean areTransactionsSupported = false;
    +        try {
    +            areTransactionsSupported = dbMeta.supportsTransactions();
    +        }
    +        catch (SQLException e) {
    +            // This error is unexpected; the dbMeta is probably corrupted
    +            return rollbackException;
    +        }
    +
    +        if (!areTransactionsSupported) {
    +            // The bad case: transactions are NOT supported
    +            if ((batchSize < 0) || (batchSize > 1)) {
    +                // The worst case: batching was used
    +                return new SQLException(failedInsertMessage + "The exact number of tuples inserted is unknown.", rollbackException);
    +            }
    +            // Batching was NOT used
    +            return new SQLException(failedInsertMessage + "The exact number of tuples inserted can be found in PXF logs.", rollbackException);
    +        }
    +
    +        // The best case: transactions are supported
    +        try {
    +            dbConn.rollback();
    +        }
    +        catch (SQLException e) {
    +            // log exception as error, but do not throw it (the actual cause of rollback will be thrown instead)
    +            LOG.error("An exception happened during the transaction rollback: '" + e.toString() + "'");
    +        }
    +        return rollbackException;
    +    }
    +
    +
    +    private static final Log LOG = LogFactory.getLog(JdbcAccessor.class);
    +
    +    private static final String failedInsertMessage = "Insert failed due to an SQLException. The target database does not support transactions and SOME DATA MAY HAVE BEEN INSERTED. ";
    --- End diff --
    
    This should be named as constant, something like `FAILED_INSERT_MESSAGE`.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213480610
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java ---
    @@ -0,0 +1,108 @@
    +package org.apache.hawq.pxf.plugins.jdbc.writercallable;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
    +
    +import java.sql.PreparedStatement;
    +
    +/**
    + * An object that processes INSERT operation on {@link OneRow} objects
    + */
    +public class WriterCallableFactory {
    +    /**
    +     * Create a new factory.
    +     *
    +     * Note that before constructing {@link WriterCallable}, 'setBatchSize' and 'setStatement' must be called.
    +     *
    +     * By default, 'batchSize' is 1 and 'statement' is null
    +     */
    +    public WriterCallableFactory() {
    +        batchSize = 1;
    +        plugin = null;
    +        query = null;
    +        statement = null;
    +    }
    +
    +    /**
    +     * Get an instance of WriterCallable
    +     *
    +     * @return an implementation of WriterCallable, chosen based on parameters that were set for this factory
    +     * @throws IllegalArgumentException if a WriterCallable cannot be created with requested parameters
    +     */
    +    public WriterCallable get() throws IllegalArgumentException {
    --- End diff --
    
    do we need to explicitly declare throwing of runtime exceptions, such as IllegalArgumentException ? It's not like the caller can recover from them in these use cases.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213477883
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java ---
    @@ -0,0 +1,113 @@
    +package org.apache.hawq.pxf.plugins.jdbc.writercallable;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +
    +/**
    + * This writer makes batch INSERTs.
    + *
    + * A call() is required after a certain number of supply() calls
    + */
    +class BatchWriterCallable implements WriterCallable {
    +    @Override
    +    public void supply(OneRow row) throws IllegalStateException {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
    +        }
    +        if (row == null) {
    +            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
    +        }
    +        rows.add(row);
    +    }
    +
    +    @Override
    +    public boolean isCallRequired() {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            return true;
    +        }
    +        return false;
    --- End diff --
    
    would be better to call the method isBatchFull() and provide a 1-liner implementation:
    ```
    return (maxRowsCount > 0) && (rows.size() >= maxRowsCount)
    ```


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214413534
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java ---
    @@ -75,16 +75,15 @@ public void setQuery(String query) {
         /**
          * Set batch size to use.
          *
    -     * @param batchSize < 0: Use batches of infinite size
    +     * @param batchSize = 0: Use batches of recommended size
          * @param batchSize = 1: Do not use batches
          * @param batchSize > 1: Use batches of the given size
    +     * @param batchSize < 0: Use batches of infinite size
          */
         public void setBatchSize(int batchSize) {
    -        if (batchSize < 0) {
    -            batchSize = 0;
    -        }
    -        else if (batchSize == 0) {
    -            batchSize = 1;
    +        if (batchSize == 0) {
    +            // Set the recommended value: https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
    +            batchSize = 100;
    --- End diff --
    
    this is counter-intuitive, a good guideline is to never change the meaning of the user-set parameter. I would propose the following -- user provides value of BATCH_SIZE:
    1. nothing (not provided) -- use default of 100
    2. 0 -- error out -- batch size of 0 does not make sense
    3. 1 -- error out -- batch size of 1 does not make sense
    4.  < 1 -- error out -- batch size of <1 does not make sense
    5. > 1 and < 65k -- use the number provided by the user
    6. > 65k -- error out -- too large for practical purposes ?


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213478992
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java ---
    @@ -0,0 +1,113 @@
    +package org.apache.hawq.pxf.plugins.jdbc.writercallable;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +
    +/**
    + * This writer makes batch INSERTs.
    + *
    + * A call() is required after a certain number of supply() calls
    + */
    +class BatchWriterCallable implements WriterCallable {
    +    @Override
    +    public void supply(OneRow row) throws IllegalStateException {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
    +        }
    +        if (row == null) {
    +            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
    +        }
    +        rows.add(row);
    +    }
    +
    +    @Override
    +    public boolean isCallRequired() {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    @Override
    +    public SQLException call() throws IOException, SQLException, ClassNotFoundException {
    +        if (rows.isEmpty()) {
    +            return null;
    +        }
    +
    +        boolean statementMustBeDeleted = false;
    +        if (statement == null) {
    +            statement = plugin.getPreparedStatement(plugin.getConnection(), query);
    +            statementMustBeDeleted = true;
    +        }
    +
    +        for (OneRow row : rows) {
    +            JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
    +            statement.addBatch();
    +        }
    +
    +        try {
    +            statement.executeBatch();
    +        }
    +        catch (SQLException e) {
    +            return e;
    +        }
    +        finally {
    +            rows.clear();
    +            if (statementMustBeDeleted) {
    +                JdbcPlugin.closeStatement(statement);
    +                statement = null;
    +            }
    +        }
    +
    +        return null;
    +    }
    +
    +    /**
    +     * Construct a new batch writer
    +     */
    +    BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement, int maxRowsCount) throws IllegalArgumentException {
    +        if ((plugin == null) || (query == null)) {
    +            throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null");
    +        }
    +        this.plugin = plugin;
    +        this.query = query;
    +        this.statement = statement;
    +        if (maxRowsCount < 0) {
    +            maxRowsCount = 0;
    --- End diff --
    
    I am not sure if we should allow unlimited batch size. Would it be better to default it to some value, say 1000 ?


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by kapustor <gi...@git.apache.org>.
Github user kapustor commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213941270
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java ---
    @@ -0,0 +1,113 @@
    +package org.apache.hawq.pxf.plugins.jdbc.writercallable;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
    +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.io.IOException;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +
    +/**
    + * This writer makes batch INSERTs.
    + *
    + * A call() is required after a certain number of supply() calls
    + */
    +class BatchWriterCallable implements WriterCallable {
    +    @Override
    +    public void supply(OneRow row) throws IllegalStateException {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
    +        }
    +        if (row == null) {
    +            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
    +        }
    +        rows.add(row);
    +    }
    +
    +    @Override
    +    public boolean isCallRequired() {
    +        if ((maxRowsCount > 0) && (rows.size() >= maxRowsCount)) {
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    @Override
    +    public SQLException call() throws IOException, SQLException, ClassNotFoundException {
    +        if (rows.isEmpty()) {
    +            return null;
    +        }
    +
    +        boolean statementMustBeDeleted = false;
    +        if (statement == null) {
    +            statement = plugin.getPreparedStatement(plugin.getConnection(), query);
    +            statementMustBeDeleted = true;
    +        }
    +
    +        for (OneRow row : rows) {
    +            JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
    +            statement.addBatch();
    +        }
    +
    +        try {
    +            statement.executeBatch();
    +        }
    +        catch (SQLException e) {
    +            return e;
    +        }
    +        finally {
    +            rows.clear();
    +            if (statementMustBeDeleted) {
    +                JdbcPlugin.closeStatement(statement);
    +                statement = null;
    +            }
    +        }
    +
    +        return null;
    +    }
    +
    +    /**
    +     * Construct a new batch writer
    +     */
    +    BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement, int maxRowsCount) throws IllegalArgumentException {
    +        if ((plugin == null) || (query == null)) {
    +            throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null");
    +        }
    +        this.plugin = plugin;
    +        this.query = query;
    +        this.statement = statement;
    +        if (maxRowsCount < 0) {
    +            maxRowsCount = 0;
    --- End diff --
    
    1000 for default is too small, for example regular value for Ignite DB is 25000. Maybe 10000?
    Btw, what are disadvantages of unlimited batch?


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r182614006
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    --- End diff --
    
    Are we having the intention to allow multiple calls of this method? Shouldn't we throw an exception in this case?


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    @sansanichfb -- can you please approve the PR if it looks good to you ? We are planning to commit it soon.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r183218994
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +        if (dbMeta.supportsTransactions()) {
    +            dbConn.setAutoCommit(false);
    +        }
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = dbConn.prepareStatement(queryWrite);
    +
    +        if ((batchSize != 0) && (!dbMeta.supportsBatchUpdates())) {
    +            LOG.info(
    +                "The database '" +
    +                dbMeta.getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     */
    +    @Override
    +    @SuppressWarnings("unchecked")
    +    public boolean writeNextObject(OneRow row) throws SQLException, IOException {
    +        // This cast is safe because the data in the row is formed by JdbcPlugin
    +        List<OneField> tuple = (List<OneField>) row.getData();
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("writeNextObject() called");
    +        }
    +
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Field " + i + ": " + DataType.get(field.type).toString());
    +            }
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statementWrite.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statementWrite.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statementWrite.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statementWrite.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statementWrite.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statementWrite.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statementWrite.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statementWrite.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statementWrite.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statementWrite.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from JdbcResolver is corrupted");
    --- End diff --
    
    This message is correct. If a certain data type is not supported, the exception must be thrown [in `JdbcResolver`](https://github.com/arenadata/incubator-hawq/blob/pxf_jdbc_writeAndFix/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java#L223).


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214408051
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,437 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.sql.Statement;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.BatchUpdateException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryRead = buildSelectQuery(connection.getMetaData());
    +        statementRead = connection.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     */
    +    @Override
    +    public void closeForRead() {
    +        JdbcPlugin.closeStatement(statementRead);
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     * @throws IllegalArgumentException if the provided or generated combination of user-defined parameters cannot be processed
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException, IllegalArgumentException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            throw new SQLException("The connection to an external database is already open.");
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = super.getPreparedStatement(connection, queryWrite);
    +
    +        // Process batchSize
    +        if ((batchSize != 0) && (!connection.getMetaData().supportsBatchUpdates())) {
    +            LOG.warn(
    +                "The database '" +
    +                connection.getMetaData().getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        // Process poolSize
    +        if (poolSize < 1) {
    +            poolSize = Runtime.getRuntime().availableProcessors();
    +            LOG.info(
    +                "The POOL_SIZE is set to the number of CPUs available (" + Integer.toString(poolSize) + ")"
    +            );
    +        }
    +        if (poolSize > 1) {
    +            executorServiceWrite = Executors.newFixedThreadPool(poolSize);
    +            poolTasks = new LinkedList<>();
    +        }
    +
    +        // Setup WriterCallableFactory
    +        writerCallableFactory = new WriterCallableFactory();
    +        writerCallableFactory.setPlugin(this);
    +        writerCallableFactory.setQuery(queryWrite);
    +        writerCallableFactory.setBatchSize(batchSize);
    +        if (poolSize == 1) {
    +            writerCallableFactory.setStatement(statementWrite);
    +        }
    +
    +        writerCallable = writerCallableFactory.get();
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     * @throws ClassNotFoundException if pooling is used and the JDBC driver was not found
    +     * @throws IllegalStateException if writerCallableFactory was not properly initialized
    +     * @throws Exception if it happens in writerCallable.call()
    +     */
    +    @Override
    +    public boolean writeNextObject(OneRow row) throws Exception {
    +        if (writerCallable == null) {
    +            throw new IllegalStateException("The JDBC connection was not properly initialized (writerCallable is null)");
    +        }
    +
    +        SQLException rollbackException = null;
    +
    +        writerCallable.supply(row);
    +        if (writerCallable.isCallRequired()) {
    +            if (poolSize > 1) {
    +                // Pooling is used. Create new writerCallable
    +                poolTasks.add(executorServiceWrite.submit(writerCallable));
    +                writerCallable = writerCallableFactory.get();
    +            }
    +            else {
    +                // Pooling is not used
    +                try {
    +                    rollbackException = writerCallable.call();
    +                }
    +                catch (SQLException e) {
    +                    rollbackException = e;
    +                }
    +                catch (Exception e) {
    +                    // This is not expected
    +                    throw e;
    +                }
    +            }
    +        }
    +
    +        if (rollbackException != null) {
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Rollback is now required");
    +            }
    +            rollbackException = tryRollback(statementWrite.getConnection(), rollbackException);
    --- End diff --
    
    @denalex , you are right. 
    The only scenario when `tryRollback()` works properly is one PXF segment, one thread, and an external database that supports transactions.
    There are more than one PXF segment available in all cases, thus there is no guarantee that rollback will work properly, and therefore, rollback is of no use.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213504917
  
    --- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
    @@ -177,11 +177,11 @@ under the License.
         </profile>
         <profile>
             <name>Jdbc</name>
    -        <description>A profile for reading data into HAWQ via JDBC</description>
    +        <description>A profile to access (read & write) data via JDBC</description>
    --- End diff --
    
    the "&" character here breaks XML and the PXF server does not even startup, please remove it.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r193745222
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,469 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +
    +import java.util.List;
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.math.BigDecimal;
    +import java.sql.Types;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation disappeared
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        super.openConnection();
    +
    +        queryRead = buildSelectQuery();
    +        statementRead = dbConn.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     *
    +     * @throws SQLException if a database access error occurs
    +     */
    +    @Override
    +    public void closeForRead() throws SQLException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            statementRead.close();
    +            statementRead = null;
    +        }
    +        super.closeConnection();
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the superclass implementation has disappeared
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    --- End diff --
    
    Fixed: https://github.com/apache/incubator-hawq/pull/1353/commits/b877c908ef202872364ccbfc20761116aa939ccf#diff-6829beb5b62de0a4880b5d5dbebca91aR127.
    
    Note that in the current version of PXF, this procedure is always called once.


---

[GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...

Posted by denalex <gi...@git.apache.org>.
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213481546
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java ---
    @@ -0,0 +1,437 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.sql.Statement;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.BatchUpdateException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryRead = buildSelectQuery(connection.getMetaData());
    +        statementRead = connection.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     */
    +    @Override
    +    public void closeForRead() {
    +        JdbcPlugin.closeStatement(statementRead);
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     * @throws IllegalArgumentException if the provided or generated combination of user-defined parameters cannot be processed
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException, IllegalArgumentException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            throw new SQLException("The connection to an external database is already open.");
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = super.getPreparedStatement(connection, queryWrite);
    +
    +        // Process batchSize
    +        if ((batchSize != 0) && (!connection.getMetaData().supportsBatchUpdates())) {
    +            LOG.warn(
    +                "The database '" +
    +                connection.getMetaData().getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        // Process poolSize
    +        if (poolSize < 1) {
    +            poolSize = Runtime.getRuntime().availableProcessors();
    +            LOG.info(
    +                "The POOL_SIZE is set to the number of CPUs available (" + Integer.toString(poolSize) + ")"
    +            );
    +        }
    +        if (poolSize > 1) {
    +            executorServiceWrite = Executors.newFixedThreadPool(poolSize);
    +            poolTasks = new LinkedList<>();
    +        }
    +
    +        // Setup WriterCallableFactory
    +        writerCallableFactory = new WriterCallableFactory();
    +        writerCallableFactory.setPlugin(this);
    +        writerCallableFactory.setQuery(queryWrite);
    +        writerCallableFactory.setBatchSize(batchSize);
    +        if (poolSize == 1) {
    +            writerCallableFactory.setStatement(statementWrite);
    +        }
    +
    +        writerCallable = writerCallableFactory.get();
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     * @throws ClassNotFoundException if pooling is used and the JDBC driver was not found
    +     * @throws IllegalStateException if writerCallableFactory was not properly initialized
    +     * @throws Exception if it happens in writerCallable.call()
    +     */
    +    @Override
    +    public boolean writeNextObject(OneRow row) throws Exception {
    +        if (writerCallable == null) {
    +            throw new IllegalStateException("The JDBC connection was not properly initialized (writerCallable is null)");
    +        }
    +
    +        SQLException rollbackException = null;
    +
    +        writerCallable.supply(row);
    +        if (writerCallable.isCallRequired()) {
    +            if (poolSize > 1) {
    +                // Pooling is used. Create new writerCallable
    +                poolTasks.add(executorServiceWrite.submit(writerCallable));
    +                writerCallable = writerCallableFactory.get();
    +            }
    +            else {
    +                // Pooling is not used
    +                try {
    +                    rollbackException = writerCallable.call();
    +                }
    +                catch (SQLException e) {
    +                    rollbackException = e;
    +                }
    +                catch (Exception e) {
    +                    // This is not expected
    +                    throw e;
    +                }
    +            }
    +        }
    +
    +        if (rollbackException != null) {
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Rollback is now required");
    +            }
    +            rollbackException = tryRollback(statementWrite.getConnection(), rollbackException);
    --- End diff --
    
    we cannot guarantee transactional semantics for this profile anyways, as a single INSERT will be executed by multiple segments and multiple PXF threads, and there is no way for one of the threads to know the status of the others. It was suggested before to remove rollback logic altogether as it serves no practical purpose and just complicates the code and the processing logic. @leskin-in @kapustor -- What do you think ? 


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    Hi @kapustor ,
    Sorry for the late reply.
    I don't have any new comments, there are couple old comments, which weren't replied.
    Apart from that - looks good.


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by leskin-in <gi...@git.apache.org>.
Github user leskin-in commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    Hi @sansanichfb,
    You have mentioned some questions that were not replied. I am a bit confused; could you clarify what exactly is unclear or should be corrected?
    
    I have also left notes on each of the comments you've made in the code.


---

[GitHub] incubator-hawq issue #1353: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by sansanichfb <gi...@git.apache.org>.
Github user sansanichfb commented on the issue:

    https://github.com/apache/incubator-hawq/pull/1353
  
    Good job leskin-in! I left some comments. Also, I am wondering, which target database patch was tested against? The transactional approach doesn't seem clear to me - if DB doesn't support transactions - you are still sending data in batches, which might leave target db in the non-consistent state since you can't rollback the last transaction. Should we reconsider this approach and send only one batch(not ideal though, maybe ask users to tweak partitioning) in that case?


---