You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by ad...@apache.org on 2018/09/07 20:28:48 UTC

[1/2] hawq git commit: HAWQ-1605. Support INSERT in PXF JDBC plugin

Repository: hawq
Updated Branches:
  refs/heads/master a741655ba -> 472fa2b74


http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
deleted file mode 100644
index 1c61537..0000000
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.hawq.pxf.plugins.jdbc;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hawq.pxf.api.*;
-import org.apache.hawq.pxf.api.io.DataType;
-import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
-import org.apache.hawq.pxf.api.utilities.InputData;
-import org.apache.hawq.pxf.api.utilities.Plugin;
-
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Class JdbcReadResolver Read the Jdbc ResultSet, and generates the data type - List {@link OneField}.
- */
-public class JdbcReadResolver extends Plugin implements ReadResolver {
-    private static final Log LOG = LogFactory.getLog(JdbcReadResolver.class);
-    //HAWQ Table column definitions
-    private ArrayList<ColumnDescriptor> columns = null;
-
-    public JdbcReadResolver(InputData input) {
-        super(input);
-        columns = input.getTupleDescription();
-    }
-
-    @Override
-    public List<OneField> getFields(OneRow row) throws Exception {
-        ResultSet result = (ResultSet) row.getData();
-        LinkedList<OneField> fields = new LinkedList<>();
-
-        for (int i = 0; i < columns.size(); i++) {
-            ColumnDescriptor column = columns.get(i);
-            String colName = column.columnName();
-            Object value = null;
-
-            OneField oneField = new OneField();
-            oneField.type = column.columnTypeCode();
-
-            switch (DataType.get(oneField.type)) {
-                case INTEGER:
-                    value = result.getInt(colName);
-                    break;
-                case FLOAT8:
-                    value = result.getDouble(colName);
-                    break;
-                case REAL:
-                    value = result.getFloat(colName);
-                    break;
-                case BIGINT:
-                    value = result.getLong(colName);
-                    break;
-                case SMALLINT:
-                    value = result.getShort(colName);
-                    break;
-                case BOOLEAN:
-                    value = result.getBoolean(colName);
-                    break;
-                case BYTEA:
-                    value = result.getBytes(colName);
-                    break;
-                case VARCHAR:
-                case BPCHAR:
-                case TEXT:
-                case NUMERIC:
-                    value = result.getString(colName);
-                    break;
-                case TIMESTAMP:
-                case DATE:
-                    value = result.getDate(colName);
-                    break;
-                default:
-                    throw new UnsupportedOperationException("Unknwon Field Type : " + DataType.get(oneField.type).toString()
-                            + ", Column : " + column.toString());
-            }
-            oneField.val = value;
-            fields.add(oneField);
-        }
-        return fields;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java
new file mode 100644
index 0000000..ab88326
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java
@@ -0,0 +1,367 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.WriteResolver;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * JDBC tables resolver
+ */
+public class JdbcResolver extends JdbcPlugin implements ReadResolver, WriteResolver {
+    /**
+     * Class constructor
+     */
+    public JdbcResolver(InputData input) throws UserDataException {
+        super(input);
+    }
+
+    /**
+     * getFields() implementation
+     *
+     * @throws SQLException if the provided {@link OneRow} object is invalid
+     */
+    @Override
+    public List<OneField> getFields(OneRow row) throws SQLException {
+        ResultSet result = (ResultSet) row.getData();
+        LinkedList<OneField> fields = new LinkedList<>();
+
+        for (ColumnDescriptor column : columns) {
+            String colName = column.columnName();
+            Object value = null;
+
+            OneField oneField = new OneField();
+            oneField.type = column.columnTypeCode();
+
+            switch (DataType.get(oneField.type)) {
+                case INTEGER:
+                    value = result.getInt(colName);
+                    break;
+                case FLOAT8:
+                    value = result.getDouble(colName);
+                    break;
+                case REAL:
+                    value = result.getFloat(colName);
+                    break;
+                case BIGINT:
+                    value = result.getLong(colName);
+                    break;
+                case SMALLINT:
+                    value = result.getShort(colName);
+                    break;
+                case BOOLEAN:
+                    value = result.getBoolean(colName);
+                    break;
+                case BYTEA:
+                    value = result.getBytes(colName);
+                    break;
+                case VARCHAR:
+                case BPCHAR:
+                case TEXT:
+                case NUMERIC:
+                    value = result.getString(colName);
+                    break;
+                case DATE:
+                    value = result.getDate(colName);
+                    break;
+                case TIMESTAMP:
+                    value = result.getTimestamp(colName);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
+            }
+
+            oneField.val = value;
+            fields.add(oneField);
+        }
+        return fields;
+    }
+
+    /**
+     * setFields() implementation
+     *
+     * @return OneRow with the data field containing a List<OneField>
+     * OneFields are not reordered before being passed to Accessor; at the
+     * moment, there is no way to correct the order of the fields if it is not.
+     * In practice, the 'record' provided is always ordered the right way.
+     *
+     * @throws UnsupportedOperationException if field of some type is not supported
+     */
+    @Override
+    public OneRow setFields(List<OneField> record) throws UnsupportedOperationException, ParseException {
+        int column_index = 0;
+        for (OneField oneField : record) {
+            ColumnDescriptor column = columns.get(column_index);
+            if (
+                LOG.isDebugEnabled() &&
+                DataType.get(column.columnTypeCode()) != DataType.get(oneField.type)
+            ) {
+                LOG.warn("The provided tuple of data may be disordered. Datatype of column with descriptor '" + column.toString() + "' must be '" + DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + DataType.get(oneField.type).toString() + "'");
+            }
+
+            // Check that data type is supported
+            switch (DataType.get(oneField.type)) {
+                case BOOLEAN:
+                case INTEGER:
+                case FLOAT8:
+                case REAL:
+                case BIGINT:
+                case SMALLINT:
+                case NUMERIC:
+                case VARCHAR:
+                case BPCHAR:
+                case TEXT:
+                case BYTEA:
+                case TIMESTAMP:
+                case DATE:
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
+            }
+
+            if (
+                LOG.isDebugEnabled() &&
+                DataType.get(oneField.type) == DataType.BYTEA
+            ) {
+                LOG.debug("OneField content (conversion from BYTEA): '" + new String((byte[])oneField.val) + "'");
+            }
+
+            // Convert TEXT columns into native data types
+            if ((DataType.get(oneField.type) == DataType.TEXT) && (DataType.get(column.columnTypeCode()) != DataType.TEXT)) {
+                oneField.type = column.columnTypeCode();
+                if (oneField.val != null) {
+                    String rawVal = (String)oneField.val;
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("OneField content (conversion from TEXT): '" + rawVal + "'");
+                    }
+                    switch (DataType.get(column.columnTypeCode())) {
+                        case VARCHAR:
+                        case BPCHAR:
+                        case TEXT:
+                        case BYTEA:
+                            break;
+                        case BOOLEAN:
+                            oneField.val = (Object)Boolean.parseBoolean(rawVal);
+                            break;
+                        case INTEGER:
+                            oneField.val = (Object)Integer.parseInt(rawVal);
+                            break;
+                        case FLOAT8:
+                            oneField.val = (Object)Double.parseDouble(rawVal);
+                            break;
+                        case REAL:
+                            oneField.val = (Object)Float.parseFloat(rawVal);
+                            break;
+                        case BIGINT:
+                            oneField.val = (Object)Long.parseLong(rawVal);
+                            break;
+                        case SMALLINT:
+                            oneField.val = (Object)Short.parseShort(rawVal);
+                            break;
+                        case NUMERIC:
+                            oneField.val = (Object)new BigDecimal(rawVal);
+                            break;
+                        case TIMESTAMP:
+                            boolean isConversionSuccessful = false;
+                            for (SimpleDateFormat sdf : timestampSDFs.get()) {
+                                try {
+                                    java.util.Date parsedTimestamp = sdf.parse(rawVal);
+                                    oneField.val = (Object)new Timestamp(parsedTimestamp.getTime());
+                                    isConversionSuccessful = true;
+                                    break;
+                                }
+                                catch (ParseException e) {
+                                    // pass
+                                }
+                            }
+                            if (!isConversionSuccessful) {
+                                throw new ParseException(rawVal, 0);
+                            }
+                            break;
+                        case DATE:
+                            oneField.val = (Object)new Date(dateSDF.get().parse(rawVal).getTime());
+                            break;
+                        default:
+                            throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported");
+                    }
+                }
+            }
+
+            column_index += 1;
+        }
+        return new OneRow(new LinkedList<OneField>(record));
+    }
+
+    /**
+     * Decode OneRow object and pass all its contents to a PreparedStatement
+     *
+     * @throws IOException if data in a OneRow is corrupted
+     * @throws SQLException if the given statement is broken
+     */
+    @SuppressWarnings("unchecked")
+    public static void decodeOneRowToPreparedStatement(OneRow row, PreparedStatement statement) throws IOException, SQLException {
+        // This is safe: OneRow comes from JdbcResolver
+        List<OneField> tuple = (List<OneField>)row.getData();
+        for (int i = 1; i <= tuple.size(); i++) {
+            OneField field = tuple.get(i - 1);
+            switch (DataType.get(field.type)) {
+                case INTEGER:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.INTEGER);
+                    }
+                    else {
+                        statement.setInt(i, (int)field.val);
+                    }
+                    break;
+                case BIGINT:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.INTEGER);
+                    }
+                    else {
+                        statement.setLong(i, (long)field.val);
+                    }
+                    break;
+                case SMALLINT:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.INTEGER);
+                    }
+                    else {
+                        statement.setShort(i, (short)field.val);
+                    }
+                    break;
+                case REAL:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.FLOAT);
+                    }
+                    else {
+                        statement.setFloat(i, (float)field.val);
+                    }
+                    break;
+                case FLOAT8:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.DOUBLE);
+                    }
+                    else {
+                        statement.setDouble(i, (double)field.val);
+                    }
+                    break;
+                case BOOLEAN:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.BOOLEAN);
+                    }
+                    else {
+                        statement.setBoolean(i, (boolean)field.val);
+                    }
+                    break;
+                case NUMERIC:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.NUMERIC);
+                    }
+                    else {
+                        statement.setBigDecimal(i, (BigDecimal)field.val);
+                    }
+                    break;
+                case VARCHAR:
+                case BPCHAR:
+                case TEXT:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.VARCHAR);
+                    }
+                    else {
+                        statement.setString(i, (String)field.val);
+                    }
+                    break;
+                case BYTEA:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.BINARY);
+                    }
+                    else {
+                        statement.setBytes(i, (byte[])field.val);
+                    }
+                    break;
+                case TIMESTAMP:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.TIMESTAMP);
+                    }
+                    else {
+                        statement.setTimestamp(i, (Timestamp)field.val);
+                    }
+                    break;
+                case DATE:
+                    if (field.val == null) {
+                        statement.setNull(i, Types.DATE);
+                    }
+                    else {
+                        statement.setDate(i, (Date)field.val);
+                    }
+                    break;
+                default:
+                    throw new IOException("The data tuple from JdbcResolver is corrupted");
+            }
+        }
+    }
+
+    private static final Log LOG = LogFactory.getLog(JdbcResolver.class);
+
+    // SimpleDateFormat to parse TEXT into DATE
+    private static ThreadLocal<SimpleDateFormat> dateSDF = new ThreadLocal<SimpleDateFormat>() {
+        @Override protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyy-MM-dd");
+        }
+    };
+    // SimpleDateFormat to parse TEXT into TIMESTAMP (with microseconds)
+    private static ThreadLocal<SimpleDateFormat[]> timestampSDFs = new ThreadLocal<SimpleDateFormat[]>() {
+        @Override protected SimpleDateFormat[] initialValue() {
+            SimpleDateFormat[] retRes = {
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S"),
+                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"),
+                new SimpleDateFormat("yyyy-MM-dd")
+            };
+            return retRes;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
index 541aa86..d6c8fba 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
@@ -18,9 +18,6 @@ package org.apache.hawq.pxf.plugins.jdbc;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hawq.pxf.api.LogicalFilter;
 import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
 import org.apache.hawq.pxf.api.BasicFilter;
@@ -29,81 +26,99 @@ import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.text.ParseException;
+
 /**
- * Parse filter object generated by parent class  {@link org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder},
- * and build WHERE statement.
- * For Multiple filters , currently only support HDOP_AND .
- * The unsupported Filter operation and  LogicalOperation ,will return null statement.
+ * A WHERE queries builder
  *
+ * Parses filter objects generated by {@link org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder} and builds WHERE statements
+ * Only HDOP_AND is supported for multiple filters
  */
 public class WhereSQLBuilder extends JdbcFilterBuilder {
-    private InputData inputData;
-
     public WhereSQLBuilder(InputData input) {
         inputData = input;
     }
 
     /**
-     * 1.check for LogicalOperator, Jdbc currently only support HDOP_AND.
-     * 2.and convert to BasicFilter List.
+     * Insert WHERE constraints into a given query
+     * Note that if filter is not supported, query is left unchanged
+     *
+     * @param dbName Database name (affects the behaviour for DATE constraints)
+     * @param query SQL query to insert constraints to. The query may may contain other WHERE statements
+     *
+     * @throws ParseException if an error happens when parsing the constraints (provided to class constructor)
      */
-    private static List<BasicFilter> convertBasicFilterList(Object filter, List<BasicFilter> returnList) throws UnsupportedFilterException {
-        if (returnList == null)
-            returnList = new ArrayList<>();
-        if (filter instanceof BasicFilter) {
-            returnList.add((BasicFilter) filter);
-            return returnList;
+    public void buildWhereSQL(String dbName, StringBuilder query) throws ParseException {
+        if (!inputData.hasFilter()) {
+            return;
         }
-        LogicalFilter lfilter = (LogicalFilter) filter;
-        if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND)
-            throw new UnsupportedFilterException("unsupported LogicalOperation : " + lfilter.getOperator());
-        for (Object f : lfilter.getFilterList()) {
-            returnList = convertBasicFilterList(f, returnList);
-        }
-        return returnList;
-    }
 
-    public String buildWhereSQL(String db_product) throws Exception {
-        if (!inputData.hasFilter())
-            return null;
-        List<BasicFilter> filters = null;
         try {
+            StringBuilder prepared = new StringBuilder();
+            if (!query.toString().contains("WHERE")) {
+                prepared.append(" WHERE ");
+            }
+            else {
+                prepared.append(" AND ");
+            }
+
+            // Get constraints and parse them
             String filterString = inputData.getFilterString();
             Object filterObj = getFilterObject(filterString);
-
+            List<BasicFilter> filters = null;
             filters = convertBasicFilterList(filterObj, filters);
-            StringBuffer sb = new StringBuffer("1=1");
+
+            String andDivisor = "";
             for (Object obj : filters) {
-                BasicFilter filter = (BasicFilter) obj;
-                sb.append(" AND ");
+                prepared.append(andDivisor);
+                andDivisor = " AND ";
 
+                // Insert constraint column name
+                BasicFilter filter = (BasicFilter) obj;
                 ColumnDescriptor column = inputData.getColumn(filter.getColumn().index());
-                //the column name of filter
-                sb.append(column.columnName());
+                prepared.append(column.columnName());
 
-                //the operation of filter
+                // Insert constraint operator
                 FilterParser.Operation op = filter.getOperation();
                 switch (op) {
                     case HDOP_LT:
-                        sb.append("<");
+                        prepared.append(" < ");
                         break;
                     case HDOP_GT:
-                        sb.append(">");
+                        prepared.append(" > ");
                         break;
                     case HDOP_LE:
-                        sb.append("<=");
+                        prepared.append(" <= ");
                         break;
                     case HDOP_GE:
-                        sb.append(">=");
+                        prepared.append(" >= ");
                         break;
                     case HDOP_EQ:
-                        sb.append("=");
+                        prepared.append(" = ");
+                        break;
+                    case HDOP_LIKE:
+                        prepared.append(" LIKE ");
                         break;
+                    case HDOP_NE:
+                        prepared.append(" <> ");
+                        break;
+                    case HDOP_IS_NULL:
+                        prepared.append(" IS NULL");
+                        continue;
+                    case HDOP_IS_NOT_NULL:
+                        prepared.append(" IS NOT NULL");
+                        continue;
                     default:
-                        throw new UnsupportedFilterException("unsupported Filter operation : " + op);
+                        throw new UnsupportedFilterException("Unsupported Filter operation: " + op);
                 }
 
-                DbProduct dbProduct = DbProduct.getDbProduct(db_product);
+                // Insert constraint constant
+                DbProduct dbProduct = DbProduct.getDbProduct(dbName);
                 Object val = filter.getConstant().constant();
                 switch (DataType.get(column.columnTypeCode())) {
                     case SMALLINT:
@@ -112,29 +127,68 @@ public class WhereSQLBuilder extends JdbcFilterBuilder {
                     case FLOAT8:
                     case REAL:
                     case BOOLEAN:
-                        sb.append(val.toString());
+                        prepared.append(val.toString());
                         break;
                     case TEXT:
-                        sb.append("'").append(val.toString()).append("'");
+                        prepared.append("'").append(val.toString()).append("'");
                         break;
                     case DATE:
-                        //According to the database products, for the date field for special treatment.
-                        sb.append(dbProduct.wrapDate(val));
+                        // Date field has different format in different databases
+                        prepared.append(dbProduct.wrapDate(val));
+                        break;
+                    case TIMESTAMP:
+                        // Timestamp field has different format in different databases
+                        prepared.append(dbProduct.wrapTimestamp(val));
                         break;
                     default:
-                        throw new UnsupportedFilterException("unsupported column type for filtering : " + column.columnTypeCode());
+                        throw new UnsupportedFilterException("Unsupported column type for filtering: " + column.columnTypeCode());
                 }
-
             }
-            return sb.toString();
-        } catch (UnsupportedFilterException ex) {
-            return null;
+
+            // No exceptions were thrown, change the provided query
+            query.append(prepared);
+        }
+        catch (UnsupportedFilterException e) {
+            LOG.debug("WHERE clause is omitted: " + e.toString());
+            // Silence the exception and do not insert constraints
+        }
+    }
+
+    /**
+     * Convert filter object into a list of {@link BasicFilter}
+     *
+     * @param filter Filter object
+     * @param returnList A list of {@link BasicFilter} to append filters to. Must be null if the function is not called recursively
+     */
+    private static List<BasicFilter> convertBasicFilterList(Object filter, List<BasicFilter> returnList) throws UnsupportedFilterException {
+        if (returnList == null) {
+            returnList = new ArrayList<>();
+        }
+
+        if (filter instanceof BasicFilter) {
+            returnList.add((BasicFilter) filter);
+            return returnList;
+        }
+
+        LogicalFilter lfilter = (LogicalFilter) filter;
+        if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND) {
+            throw new UnsupportedFilterException("Logical operation '" + lfilter.getOperator() + "' is not supported");
+        }
+        for (Object f : lfilter.getFilterList()) {
+            returnList = convertBasicFilterList(f, returnList);
         }
+
+        return returnList;
     }
 
-    static class UnsupportedFilterException extends Exception {
-        UnsupportedFilterException(String message) {
+    private static class UnsupportedFilterException extends Exception {
+		UnsupportedFilterException(String message) {
             super(message);
         }
     }
+
+    private static final Log LOG = LogFactory.getLog(WhereSQLBuilder.class);
+
+    // {@link InputData} from PXF
+    private InputData inputData;
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
index cdca8a6..bb79c84 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
@@ -19,29 +19,34 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  * under the License.
  */
 
-
 import org.apache.commons.lang.ArrayUtils;
 
 /**
- * A tool class, used to deal with byte array merging, split and other methods.
+ * A tool class for byte array merging, splitting and conversion
  */
 public class ByteUtil {
-
     public static byte[] mergeBytes(byte[] b1, byte[] b2) {
         return ArrayUtils.addAll(b1,b2);
     }
 
-    public static byte[][] splitBytes(byte[] bytes, int n) {
-        int len = bytes.length / n;
+    /**
+     * Split a byte[] array into two arrays, each of which represents a value of type long
+     */
+    public static byte[][] splitBytes(byte[] bytes) {
+        final int N = 8;
+        int len = bytes.length / N;
         byte[][] newBytes = new byte[len][];
         int j = 0;
         for (int i = 0; i < len; i++) {
-            newBytes[i] = new byte[n];
-            for (int k = 0; k < n; k++) newBytes[i][k] = bytes[j++];
+            newBytes[i] = new byte[N];
+            for (int k = 0; k < N; k++) newBytes[i][k] = bytes[j++];
         }
         return newBytes;
     }
 
+    /**
+     * Convert a value of type long to a byte[] array
+     */
     public static byte[] getBytes(long value) {
         byte[] b = new byte[8];
         b[0] = (byte) ((value >> 56) & 0xFF);
@@ -55,22 +60,9 @@ public class ByteUtil {
         return b;
     }
 
-    public static byte[] getBytes(int value) {
-        byte[] b = new byte[4];
-        b[0] = (byte) ((value >> 24) & 0xFF);
-        b[1] = (byte) ((value >> 16) & 0xFF);
-        b[2] = (byte) ((value >> 8) & 0xFF);
-        b[3] = (byte) ((value >> 0) & 0xFF);
-        return b;
-    }
-
-    public static int toInt(byte[] b) {
-        return (((((int) b[3]) & 0xFF) << 32) +
-                ((((int) b[2]) & 0xFF) << 40) +
-                ((((int) b[1]) & 0xFF) << 48) +
-                ((((int) b[0]) & 0xFF) << 56));
-    }
-
+    /**
+     * Convert a byte[] array to a value of type long
+     */
     public static long toLong(byte[] b) {
         return ((((long) b[7]) & 0xFF) +
                 ((((long) b[6]) & 0xFF) << 8) +

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
index 30ff1fe..c8b8cfb 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
@@ -20,14 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * As the syntax of different database products are not the same, such as the date type  field for processing, ORACLE use to_date () function, and mysql use Date () function.
- So we create this class to abstract public methods, the specific database products can implementation of these  methods.
+ * A tool class to process data types that must have different form in different databases.
+ * Such processing is required to create correct constraints (WHERE statements).
  */
 public abstract class DbProduct {
-    //wrap date string
-    public abstract String wrapDate(Object date_val);
-
-
+    /**
+     * Get an instance of some class - the database product
+     *
+     * @param String dbName A full name of the database
+     * @return a DbProduct of the required class
+     */
     public static DbProduct getDbProduct(String dbName) {
         if (dbName.toUpperCase().contains("MYSQL"))
             return new MysqlProduct();
@@ -35,15 +37,40 @@ public abstract class DbProduct {
             return new OracleProduct();
         else if (dbName.toUpperCase().contains("POSTGRES"))
             return new PostgresProduct();
+        else if (dbName.toUpperCase().contains("MICROSOFT"))
+            return new MicrosoftProduct();
         else
-            //Unsupported databases may execute errors
             return new CommonProduct();
     }
+
+    /**
+     * Wraps a given date value the way required by a target database
+     *
+     * @param val {@link java.sql.Date} object to wrap
+     * @return a string with a properly wrapped date object
+     */
+    public abstract String wrapDate(Object val);
+
+    /**
+     * Wraps a given timestamp value the way required by a target database
+     *
+     * @param val {@link java.sql.Timestamp} object to wrap
+     * @return a string with a properly wrapped timestamp object
+     */
+    public abstract String wrapTimestamp(Object val);
 }
 
+/**
+ * Common product. Used when no other products are avalibale
+ */
 class CommonProduct extends DbProduct {
     @Override
-    public String wrapDate(Object dateVal) {
-        return "date'" + dateVal + "'";
+    public String wrapDate(Object val) {
+        return "date'" + val + "'";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java
new file mode 100644
index 0000000..5cec52d
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java
@@ -0,0 +1,35 @@
+package org.apache.hawq.pxf.plugins.jdbc.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implements methods for the Microsoft SQL server database
+ */
+public class MicrosoftProduct extends DbProduct {
+    @Override
+    public String wrapDate(Object val){
+        return "'" + val + "'";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
+    }
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
index 2e60ada..27f7605 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
@@ -20,12 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * Implements methods for MySQL Database.
+ * Implements methods for the MySQL Database.
  */
 public class MysqlProduct extends DbProduct {
+    @Override
+    public String wrapDate(Object val){
+        return "DATE('" + val + "')";
+    }
 
     @Override
-    public String wrapDate(Object dateVal){
-        return "DATE('" + dateVal + "')";
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
index b46c5f3..c2c656b 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
@@ -20,11 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * Implements methods for Oracle Database.
+ * Implements methods for the Oracle Database.
  */
 public class OracleProduct extends DbProduct {
     @Override
-    public String wrapDate(Object dateVal) {
-        return "to_date('" + dateVal + "','yyyy-mm-dd')";
+    public String wrapDate(Object val) {
+        return "to_date('" + val + "', 'YYYY-MM-DD')";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "to_timestamp('" + val + "', 'YYYY-MM-DD HH:MI:SS.FF')";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
index 901cf2e..c25ec96 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
@@ -20,11 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils;
  */
 
 /**
- * Implements methods for Postgres Database.
+ * Implements methods for the PostgreSQL.
  */
 public class PostgresProduct extends DbProduct {
     @Override
-    public String wrapDate(Object dateVal) {
-        return "date'" + dateVal + "'";
+    public String wrapDate(Object val) {
+        return "date'" + val + "'";
+    }
+
+    @Override
+    public String wrapTimestamp(Object val) {
+        return "'" + val + "'";
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java
new file mode 100644
index 0000000..3e1404c
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java
@@ -0,0 +1,109 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * This writer makes batch INSERTs.
+ *
+ * A call() is required after a certain number of supply() calls
+ */
+class BatchWriterCallable implements WriterCallable {
+    @Override
+    public void supply(OneRow row) throws IllegalStateException {
+        if ((batchSize > 0) && (rows.size() >= batchSize)) {
+            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
+        }
+        if (row == null) {
+            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
+        }
+        rows.add(row);
+    }
+
+    @Override
+    public boolean isCallRequired() {
+        return (batchSize > 0) && (rows.size() >= batchSize);
+    }
+
+    @Override
+    public SQLException call() throws IOException, SQLException, ClassNotFoundException {
+        if (rows.isEmpty()) {
+            return null;
+        }
+
+        boolean statementMustBeDeleted = false;
+        if (statement == null) {
+            statement = plugin.getPreparedStatement(plugin.getConnection(), query);
+            statementMustBeDeleted = true;
+        }
+
+        for (OneRow row : rows) {
+            JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
+            statement.addBatch();
+        }
+
+        try {
+            statement.executeBatch();
+        }
+        catch (SQLException e) {
+            return e;
+        }
+        finally {
+            rows.clear();
+            if (statementMustBeDeleted) {
+                JdbcPlugin.closeStatement(statement);
+                statement = null;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Construct a new batch writer
+     */
+    BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement, int batchSize) {
+        if (plugin == null || query == null) {
+            throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null");
+        }
+
+        this.plugin = plugin;
+        this.query = query;
+        this.statement = statement;
+        this.batchSize = batchSize;
+
+        rows = new LinkedList<>();
+    }
+
+    private final JdbcPlugin plugin;
+    private final String query;
+    private PreparedStatement statement;
+    private List<OneRow> rows;
+    private final int batchSize;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java
new file mode 100644
index 0000000..63dbb29
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java
@@ -0,0 +1,102 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * This writer makes simple, one-by-one INSERTs.
+ *
+ * A call() is required after every supply()
+ */
+class SimpleWriterCallable implements WriterCallable {
+    @Override
+    public void supply(OneRow row) throws IllegalStateException {
+        if (this.row != null) {
+            throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable");
+        }
+        if (row == null) {
+            throw new IllegalArgumentException("Trying to supply() a null OneRow object");
+        }
+        this.row = row;
+    }
+
+    @Override
+    public boolean isCallRequired() {
+        return this.row != null;
+    }
+
+    @Override
+    public SQLException call() throws IOException, SQLException, ClassNotFoundException {
+        if (row == null) {
+            return null;
+        }
+
+        boolean statementMustBeDeleted = false;
+        if (statement == null) {
+            statement = plugin.getPreparedStatement(plugin.getConnection(), query);
+            statementMustBeDeleted = true;
+        }
+
+        JdbcResolver.decodeOneRowToPreparedStatement(row, statement);
+
+        try {
+            if (statement.executeUpdate() != 1) {
+                throw new SQLException("The number of rows affected by INSERT query is not equal to the number of rows provided");
+            }
+        }
+        catch (SQLException e) {
+            return e;
+        }
+        finally {
+            row = null;
+            if (statementMustBeDeleted) {
+                JdbcPlugin.closeStatement(statement);
+                statement = null;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Construct a new simple writer
+     */
+    SimpleWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement) {
+        if ((plugin == null) || (query == null)) {
+            throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null");
+        }
+        this.plugin = plugin;
+        this.query = query;
+        this.statement = statement;
+        row = null;
+    }
+
+    private final JdbcPlugin plugin;
+    private final String query;
+    private PreparedStatement statement;
+    private OneRow row;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java
new file mode 100644
index 0000000..e2a6916
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java
@@ -0,0 +1,56 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+
+import java.util.concurrent.Callable;
+import java.sql.SQLException;
+
+/**
+ * An object that processes INSERT operation on {@link OneRow} objects
+ */
+public interface WriterCallable extends Callable<SQLException> {
+    /**
+     * Pass the next OneRow to this WriterCallable.
+     *
+     * @throws IllegalStateException if this WriterCallable must be call()ed before the next call to supply()
+     */
+    void supply(OneRow row) throws IllegalStateException;
+
+    /**
+     * Check whether this WriterCallable must be called
+     *
+     * @return true if this WriterCallable must be call()ed before the next call to supply()
+     * @return false otherwise
+     */
+    boolean isCallRequired();
+
+    /**
+     * Execute an INSERT query.
+     *
+     * @return null or a SQLException that happened when executing the query
+     * @return null if the query was empty (nothing was there to execute)
+     *
+     * @throws Exception an exception that happened during execution, but that is not related to the execution of the query itself (for instance, it may originate from {@link java.sql.PreparedStatement} close() method)
+     */
+    @Override
+    SQLException call() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java
new file mode 100644
index 0000000..aaf13bd
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java
@@ -0,0 +1,97 @@
+package org.apache.hawq.pxf.plugins.jdbc.writercallable;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin;
+
+import java.sql.PreparedStatement;
+
+/**
+ * An object that processes INSERT operation on {@link OneRow} objects
+ */
+public class WriterCallableFactory {
+    /**
+     * Create a new {@link WriterCallable} factory.
+     *
+     * Note that 'setPlugin' and 'setQuery' must be called before construction of a {@link WriterCallable}.
+     *
+     * By default, 'statement' is null
+     */
+    public WriterCallableFactory() {
+        batchSize = JdbcPlugin.DEFAULT_BATCH_SIZE;
+        plugin = null;
+        query = null;
+        statement = null;
+    }
+
+    /**
+     * Get an instance of WriterCallable
+     *
+     * @return an implementation of WriterCallable, chosen based on parameters that were set for this factory
+     */
+    public WriterCallable get() {
+        if (batchSize > 1) {
+            return new BatchWriterCallable(plugin, query, statement, batchSize);
+        }
+        return new SimpleWriterCallable(plugin, query, statement);
+    }
+
+    /**
+     * Set {@link JdbcPlugin} to use.
+     * REQUIRED
+     */
+    public void setPlugin(JdbcPlugin plugin) {
+        this.plugin = plugin;
+    }
+
+    /**
+     * Set SQL query to use.
+     * REQUIRED
+     */
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+    /**
+     * Set batch size to use
+     *
+     * @param batchSize > 1: Use batches of specified size
+     * @param batchSize < 1: Do not use batches
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Set statement to use.
+     *
+     * @param statement = null: Create a new connection & a new statement each time {@link WriterCallable} is called
+     * @param statement not null: Use the given statement and do not close or reopen it
+     */
+    public void setStatement(PreparedStatement statement) {
+        this.statement = statement;
+    }
+
+    private int batchSize;
+    private JdbcPlugin plugin;
+    private String query;
+    private PreparedStatement statement;
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
index 6785af6..33ae585 100644
--- a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
+++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
@@ -56,7 +56,7 @@ public class JdbcPartitionFragmenterTest {
 
         //fragment - 1
         byte[] fragMeta = fragments.get(0).getMetadata();
-        byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 8);
+        byte[][] newBytes = ByteUtil.splitBytes(fragMeta);
         long fragStart = ByteUtil.toLong(newBytes[0]);
         long fragEnd = ByteUtil.toLong(newBytes[1]);
         assertDateEquals(fragStart, 2008, 1, 1);
@@ -64,7 +64,7 @@ public class JdbcPartitionFragmenterTest {
 
         //fragment - 12
         fragMeta = fragments.get(11).getMetadata();
-        newBytes = ByteUtil.splitBytes(fragMeta, 8);
+        newBytes = ByteUtil.splitBytes(fragMeta);
         fragStart = ByteUtil.toLong(newBytes[0]);
         fragEnd = ByteUtil.toLong(newBytes[1]);
         assertDateEquals(fragStart, 2008, 12, 1);
@@ -102,17 +102,17 @@ public class JdbcPartitionFragmenterTest {
 
         //fragment - 1
         byte[] fragMeta = fragments.get(0).getMetadata();
-        byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 4);
-        int fragStart = ByteUtil.toInt(newBytes[0]);
-        int fragEnd = ByteUtil.toInt(newBytes[1]);
+        byte[][] newBytes = ByteUtil.splitBytes(fragMeta);
+        long fragStart = ByteUtil.toLong(newBytes[0]);
+        long fragEnd = ByteUtil.toLong(newBytes[1]);
         assertEquals(2001, fragStart);
         assertEquals(2003, fragEnd);
 
         //fragment - 6
         fragMeta = fragments.get(5).getMetadata();
-        newBytes = ByteUtil.splitBytes(fragMeta, 4);
-        fragStart = ByteUtil.toInt(newBytes[0]);
-        fragEnd = ByteUtil.toInt(newBytes[1]);
+        newBytes = ByteUtil.splitBytes(fragMeta);
+        fragStart = ByteUtil.toLong(newBytes[0]);
+        fragEnd = ByteUtil.toLong(newBytes[1]);
         assertEquals(2011, fragStart);
         assertEquals(2012, fragEnd);
 

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
index ebe367d..de173b8 100644
--- a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
+++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
@@ -59,44 +59,53 @@ public class SqlBuilderTest {
     public void testIdFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        when(inputData.getFilterString()).thenReturn("a0c20s1d1o5");//id=1
+        // id = 1
+        when(inputData.getFilterString()).thenReturn("a0c20s1d1o5");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals("1=1 AND id=1", builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals(" WHERE id = 1", sb.toString());
     }
 
     @Test
     public void testDateAndAmtFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        // cdate>'2008-02-01' and cdate<'2008-12-01' and amt > 1200
+        // cdate > '2008-02-01' and cdate < '2008-12-01' and amt > 1200
         when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a1c25s10d2008-12-01o1l0a2c20s4d1200o2l0");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals("1=1 AND cdate>DATE('2008-02-01') AND cdate<DATE('2008-12-01') AND amt>1200"
-                , builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals(" WHERE cdate > DATE('2008-02-01') AND cdate < DATE('2008-12-01') AND amt > 1200"
+                , sb.toString());
     }
 
     @Test
     public void testUnsupportedOperationFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        // grade like 'bad'
-        when(inputData.getFilterString()).thenReturn("a3c25s3dbado7");
+        // IN 'bad'
+        when(inputData.getFilterString()).thenReturn("a3c25s3dbado10");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals(null, builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals("", sb.toString());
     }
 
     @Test
     public void testUnsupportedLogicalFilter() throws Exception {
         prepareConstruction();
         when(inputData.hasFilter()).thenReturn(true);
-        // cdate>'2008-02-01' or amt < 1200
+        // cdate > '2008-02-01' or amt < 1200
         when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a2c20s4d1200o2l1");
 
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        assertEquals(null, builder.buildWhereSQL(DB_PRODUCT));
+        StringBuilder sb = new StringBuilder();
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals("", sb.toString());
     }
 
     @Test
@@ -110,11 +119,11 @@ public class SqlBuilderTest {
         List<Fragment> fragments = fragment.getFragments();
         assertEquals(6, fragments.size());
 
-        //partition-1 : cdate>=2008-01-01 and cdate<2008-03-01
+        // Partition: cdate >= 2008-01-01 and cdate < 2008-03-01
         when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
-        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL);
-        assertEquals(ORIGINAL_SQL + " WHERE 1=1  AND " +
-                "cdate >= DATE('2008-01-01') AND cdate < DATE('2008-03-01')", fragmentSql);
+        StringBuilder sb = new StringBuilder(ORIGINAL_SQL);
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL + " WHERE cdate >= DATE('2008-01-01') AND cdate < DATE('2008-03-01')", sb.toString());
     }
 
     @Test
@@ -125,19 +134,19 @@ public class SqlBuilderTest {
         when(inputData.getUserProperty("PARTITION_BY")).thenReturn("grade:enum");
         when(inputData.getUserProperty("RANGE")).thenReturn("excellent:good:general:bad");
 
+        StringBuilder sb = new StringBuilder(ORIGINAL_SQL);
         WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
-        String whereSql = builder.buildWhereSQL(DB_PRODUCT);
-        assertEquals("1=1 AND id>5", whereSql);
+        builder.buildWhereSQL(DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL + " WHERE id > 5", sb.toString());
 
         JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
         List<Fragment> fragments = fragment.getFragments();
 
-        //partition-1 : id>5 and grade='excellent'
+        // Partition: id > 5 and grade = 'excellent'
         when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
 
-        String filterSql = ORIGINAL_SQL + " WHERE " + whereSql;
-        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, filterSql);
-        assertEquals(filterSql + " AND grade='excellent'", fragmentSql);
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL + " WHERE id > 5 AND grade = 'excellent'", sb.toString());
     }
 
     @Test
@@ -150,8 +159,9 @@ public class SqlBuilderTest {
 
         when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
 
-        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL);
-        assertEquals(ORIGINAL_SQL, fragmentSql);
+        StringBuilder sb = new StringBuilder(ORIGINAL_SQL);
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb);
+        assertEquals(ORIGINAL_SQL, sb.toString());
     }
 
 

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index 252791e..0cddad1 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -177,11 +177,11 @@ under the License.
     </profile>
     <profile>
         <name>Jdbc</name>
-        <description>A profile for reading data into HAWQ via JDBC</description>
+        <description>A profile for reading and writing data via JDBC</description>
         <plugins>
             <fragmenter>org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter</fragmenter>
-            <accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor</accessor>
-            <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver>
+            <accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor</accessor>
+            <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcResolver</resolver>
         </plugins>
     </profile>
     <profile>


[2/2] hawq git commit: HAWQ-1605. Support INSERT in PXF JDBC plugin

Posted by ad...@apache.org.
HAWQ-1605. Support INSERT in PXF JDBC plugin

(closes #1353)

Fix incorrect TIMESTAMP handling

PXF JDBC plugin update

* Add support for INSERT queries:
	* The INSERT queries are processed by the same classes as the SELECT queries;
	* INSERTs are processed by the JDBC PreparedStatement;
	* INSERTs support batching (by means of JDBC);

* Minor changes in WhereSQLBuilder and JdbcPartitionFragmenter:
	* Removed 'WHERE 1=1';
	* The same pattern of spaces around operators everywhere ('a = b', not 'a=b');
	* JdbcPartitionFragmenter.buildFragmenterSql() made static to avoid extra checks of InputData (proposed by @sansanichfb);

* Refactoring and some microoptimizations;

PXF JDBC refactoring

* The README.md is completely rewritten;

* Lots of changes in comments and javadoc comments;

* Code refactoring and minor changes in codestyle

Fixes proposed by @sansanichfb

Add DbProduct for Microsoft SQL Server

Notes on consistency in README and errors

* Add an explicit note on consistency of INSERT queries (it is not guaranteed).

* Change error message on INSERT failure

* Minor corrections of README

The fixes were proposed by @sansanichfb

Improve WhereSQLBuilder

* Add support of TIMESTAMP values;

* Add support of operations <>, LIKE, IS NULL, IS NOT NULL.

Fix proposed by @sansanichfb

Throw an exception when trying to open an already open connection when writing to an external database using `openForWrite()`.

Although the behaviour is different in case of `openForRead()`, it does not apply here. The second call to `openForWrite()` could be made from another thread, and that would result in a race: the `PreparedStatement` we use to write to an external database is the same object for all threads, and the procedure `writeNextObject()` is not `synchronized` (or "protected" some other way).

Simplify logging; BatchUpdateException

Simplify logging so that the logs produced by pxf-jdbc do not grow too big in case DEBUG is enabled (the removed logging calls provide the field types and names, and in most cases they are the same as in the data provided. The exceptions are still being logged).

Add processing of BatchUpdateException, so that the real cause of an exception is returned to the user.

PXF JDBC thread pool support

Implement support of multi-threaded processing of INSERT queries, using a thread pool. To use the feature, set the parameter POOL_SIZE in the LOCATION clause of an external table (<1: Pool size is equal to a number of CPUs available to JVM; =1: Disable thread pool; >1: Use the given size of a pool.

Not all operations are processed by pool threads: pool threads only execute() the queries, but they do not fill the PreparedStatement from OneRow.

Redesign connection pooling

* Redesign connection pooling: move OneRow objects processing to threads from the pool. This decreases the load of a single-thread part of PXF;

* Introduce WriterCallable & related. This significantly simplifies the code of JdbcAccessor and allows to introduce new methods of processing INSERT queries with ease and enables fast hardcode tweaks for the same purpose.

* Add docs on thread pool feature

Support long values in PARTITION clause

Support values of Java primitive type 'long' in PARTITION clause (both for RANGE and INTERVAL variables).

* Modify JdbcPartitionFragmenter (convert all int variables to long)
* Move parsing of INTERVAL values for PARTITION_TYPE "INT" to class constructor (and add a parse exception handler)
* Simplify ByteUtil (remove methods to deal with values of type 'int')
* Update JdbcPartitionFragmenterTest
* Minor changes in comments

Fix pxf-profiles-default.xml

Remove ampersand from a description of JDBC profile from pxf-profiles-default.xml

Remove explicit throws of IllegalArgumentException

Remove explicit references to 'IllegalArgumentException', as the caller is probably unable to recover from them.
'IllegalStateException' is left unchanged, as it is thrown when the caller must perform an action that will resolve the problem ('WriterCallable' is full).

Other runtime exceptions are explicitly listed in function definitions as before; their causes are usually known to the caller, so it could do something about them or at least send a more meaningful message about the error cause to the user.

Proposed by Alex Denissov <ad...@pivotal.io>

Simplify isCallRequired()

Make function 'isCallRequired()' body a one-line expression in all implementations of 'WriterCallable'.

Proposed by Alex Denissov <ad...@pivotal.io>

Remove rollback and change BATCH_SIZE logic

Remove calls to 'tryRollback()' and all processing of rollbacks in INSERT queries.
The reason for the change is that rollback is effective for only one case: INSERT is performed from one PXF segment that uses one thread to perform that INSERT, and the external database supports transactions. In most cases, there are more than one PXF segment that performs INSERT, and rollback is of no use then.
On the other hand, rollback logic is cumbersome and notably increases code complexity.

Due to the removal of rollback, there is no longer a need to keep BATCH_SIZE infinite as often as possible (when BATCH_SIZE is infinite, the number of scenarious of rollback() failing is lower (but this number is not zero)).
Thus, setting a recommended (https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754) value makes sense.
The old logic of infinite batch size also remains active.

Modify README.md: minor corrections, new BATCH_SIZE logic

Proposed by Alex Denissov <ad...@pivotal.io>

Change BATCH_SIZE logic

* Modify BATCH_SIZE parameter processing according to new proposals https://github.com/apache/hawq/pull/1353#discussion_r214413534
* Update README.md
* Restore fallback to non-batched INSERTs in case the external database (or JDBC connector) does not support batch updates

Proposed by Alex Denissov <ad...@pivotal.io>
Proposed by Dmitriy Pavlov <pa...@gmail.com>

Modify processing of BATCH_SIZE parameter

Modify BATCH_SIZE parameter processing according to the proposal https://github.com/apache/hawq/pull/1353#discussion_r215023775:
* Update allowed values of BATCH_SIZE and their meanings
* Introduce explicit flag of presentness of a BATCH_SIZE parameter
* Introduce DEFAULT_BATCH_SIZE constant in JdbcPlugin
* Move processing of BATCH_SIZE values to JdbcAccessor
* Update README.md

Proposed by @divyabhargov, @denalex

Fix column type for columns converted to TEXT

Modify column type processing so that the column type is set correctly for fields that:
* Are represented as columns of type TEXT by GPDBWritable, but whose actual type is different
* Contain NULL value

Before, the column type code was not set correctly for such columns due to a check of NULL field value.

Proposed and authored by @divyabhargov

removed parseUnsignedInt


Project: http://git-wip-us.apache.org/repos/asf/hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/hawq/commit/472fa2b7
Tree: http://git-wip-us.apache.org/repos/asf/hawq/tree/472fa2b7
Diff: http://git-wip-us.apache.org/repos/asf/hawq/diff/472fa2b7

Branch: refs/heads/master
Commit: 472fa2b74679a5d2b6fb08dd107b292a95e577b7
Parents: a741655
Author: Ivan Leskin <le...@phystech.edu>
Authored: Mon Mar 5 17:19:26 2018 +0300
Committer: Alexander Denissov <ad...@pivotal.io>
Committed: Fri Sep 7 12:23:24 2018 -0700

----------------------------------------------------------------------
 pxf/pxf-jdbc/README.md                          | 341 ++++++++++------
 .../hawq/pxf/plugins/jdbc/JdbcAccessor.java     | 353 +++++++++++++++++
 .../pxf/plugins/jdbc/JdbcFilterBuilder.java     |  75 ++--
 .../plugins/jdbc/JdbcPartitionFragmenter.java   | 391 ++++++++++---------
 .../hawq/pxf/plugins/jdbc/JdbcPlugin.java       | 226 ++++++++---
 .../hawq/pxf/plugins/jdbc/JdbcReadAccessor.java | 122 ------
 .../hawq/pxf/plugins/jdbc/JdbcReadResolver.java | 103 -----
 .../hawq/pxf/plugins/jdbc/JdbcResolver.java     | 367 +++++++++++++++++
 .../hawq/pxf/plugins/jdbc/WhereSQLBuilder.java  | 162 +++++---
 .../hawq/pxf/plugins/jdbc/utils/ByteUtil.java   |  38 +-
 .../hawq/pxf/plugins/jdbc/utils/DbProduct.java  |  45 ++-
 .../plugins/jdbc/utils/MicrosoftProduct.java    |  35 ++
 .../pxf/plugins/jdbc/utils/MysqlProduct.java    |  10 +-
 .../pxf/plugins/jdbc/utils/OracleProduct.java   |  11 +-
 .../pxf/plugins/jdbc/utils/PostgresProduct.java |  11 +-
 .../writercallable/BatchWriterCallable.java     | 109 ++++++
 .../writercallable/SimpleWriterCallable.java    | 102 +++++
 .../jdbc/writercallable/WriterCallable.java     |  56 +++
 .../writercallable/WriterCallableFactory.java   |  97 +++++
 .../jdbc/JdbcPartitionFragmenterTest.java       |  16 +-
 .../hawq/pxf/plugins/jdbc/SqlBuilderTest.java   |  54 +--
 .../src/main/resources/pxf-profiles-default.xml |   6 +-
 22 files changed, 1984 insertions(+), 746 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/README.md
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/README.md b/pxf/pxf-jdbc/README.md
index c158f32..fde1641 100644
--- a/pxf/pxf-jdbc/README.md
+++ b/pxf/pxf-jdbc/README.md
@@ -1,139 +1,254 @@
-# Accessing Jdbc Table Data
+# PXF JDBC plugin
 
-The PXF JDBC plug-in reads data stored in Traditional relational database,ie : mysql,ORACLE,postgresql.
+The PXF JDBC plugin allows to access external databases that implement [the Java Database Connectivity API](http://www.oracle.com/technetwork/java/javase/jdbc/index.html). Both read (SELECT) and write (INSERT) operations are supported by the plugin.
 
-PXF-JDBC plug-in is the client of the database, the host running the database engine does not need to
-deploy PXF.
+PXF JDBC plugin is a JDBC client. The host running the external database does not need to deploy PXF.
 
 
-# Prerequisites
+## Prerequisites
 
-Check the following before using PXF to access JDBC Table:
-* The PXF JDBC plug-in is installed on all cluster nodes.
-* The JDBC JAR files are installed on all cluster nodes, and added to file - 'pxf-public.classpath'
-* You have tested PXF on HDFS.
+Check the following before using the PXF JDBC plugin:
 
-# Using PXF Tables to Query JDBC Table
-Jdbc tables are defined in same schema in PXF.The PXF table has the same column name
-as Jdbc Table, and the column type requires a mapping of Jdbc-HAWQ.
+* The PXF JDBC plugin is installed on all PXF nodes;
+* The JDBC driver for external database is installed on all PXF nodes;
+* All PXF nodes are allowed to connect to the external database.
 
-## Syntax Example
-The following PXF table definition is valid for Jdbc Table.
 
-    CREATE [READABLE|WRITABLE] EXTERNAL TABLE table_name
-        ( column_name data_type [, ...] | LIKE other_table )
-    LOCATION ('pxf://namenode[:port]/jdbc-schema-name.jdbc-table-name?<pxf-parameters><&custom-parameters>')
-    FORMAT 'CUSTOM' (formatter='pxfwritable_import')
-If `jdbc-schema-name` is omitted, pxf will default to the `default` schema.
+## Limitations
 
-The `column_name` must exists in jdbc-table,`data_type` equals or similar to
-the jdbc-column type.
+Both **PXF table** **and** a **table in external database** **must have the same definiton**. Their columns must have the same names, and the columns' types must correspond.
 
-where `<pxf-parameters>` is:
+**Not all data types are supported** by the plugin. The following PXF data types are supported:
 
-    [FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
-    &ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor
-    &RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver]
-    | PROFILE=Jdbc
+* `INTEGER`, `BIGINT`, `SMALLINT`
+* `REAL`, `FLOAT8`
+* `NUMERIC`
+* `BOOLEAN`
+* `VARCHAR`, `BPCHAR`, `TEXT`
+* `DATE`
+* `TIMESTAMP`
+* `BYTEA`
 
-where `<custom-parameters>` is:
+The `<full_external_table_name>` (see below) **must not match** the [pattern](https://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html) `/.*/[0-9]*-[0-9]*_[0-9]*` (the name must not start with `/` and have an ending that consists of `/` and three groups of numbers of arbitrary length, the first two separated by `-` and the last two separated by `_`. For example, the following table name is not allowed: `/public.table/1-2_3`).
 
-    JDBC_DRIVER=<jdbc-driver-class-name>
-     &DB_URL=<jdbc-url>&USER=<database-user>&PASS=<password>
+At the moment, one PXF external table cannot serve both SELECT and INSERT queries. A separate PXF external table is required for each type of queries.
 
-## Jdbc Table to HAWQ Data Type Mapping
-Jdbc-table and hawq-table data type system is similar to, does not require
-a special type of mapping.
-# Usage
-The following to mysql, for example, describes the use of PDF-JDBC.
 
-To query MySQL Table in HAWQ, perform the following steps:
-1. create Table in MySQL
+## Syntax
+```
+CREATE [ READABLE | WRITABLE ] EXTERNAL TABLE <table_name> (
+    { <column_name> <data_type> [, ...] | LIKE <other_table> }
+)
+LOCATION (
+    'pxf://<full_external_table_name>?<pxf_parameters><jdbc_required_parameters><jdbc_login_parameters><plugin_parameters>'
+)
+FORMAT 'CUSTOM' (FORMATTER={'pxfwritable_import' | 'pxfwritable_export'})
+```
 
-         mysql> use demodb;
-         mysql> create table myclass(
-                 id int(4) not null primary key,
-                 name varchar(20) not null,
-                 gender int(4) not null default '0',
-                 degree double(16,2));`
-2. insert test data
+The **`<pxf_parameters>`** are:
+```
+{
+PROFILE=JDBC
+|
+FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
+&ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor
+&RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcResolver
+}
+```
 
-        insert into myclass values(1,"tom",1,90);
-        insert into myclass values(2,'john',0,94);
-        insert into myclass values(3,'simon',1,79);
-3. copy mysql-jdbc jar files to `/usr/lib/pxf` (on all cluster nodes), and
-edit `/etc/pxf/conf/pxf-public.classpath` , add :
+The **`<jdbc_required_parameters>`** are:
+```
+&JDBC_DRIVER=<external_database_jdbc_driver>
+&DB_URL=<external_database_url>
+```
 
-        /usr/lib/pxf/mysql-connector-java-*.jar
+The **`<jdbc_login_parameters>`** are **optional**, but if provided, both of them must be present:
+```
+&USER=<external_database_login>
+&PASS=<external_database_password>
+```
 
-     Restart all pxf-engine.
+The **`<plugin_parameters>`** are **optional**:
 
-4. create Table in HAWQ:
+```
+[
+&BATCH_SIZE=<batch_size>
+]
+[
+&POOL_SIZE=<pool_size>
+]
+[
+&PARTITION_BY=<column>:<column_type>
+&RANGE=<start_value>:<end_value>
+[&INTERVAL=<value>[:<unit>]]
+]
+```
 
-        gpadmin=# CREATE EXTERNAL TABLE myclass(id integer,
-             name text,
-             gender integer,
-             degree float8)
-             LOCATION ('pxf://localhost:51200/demodb.myclass'
-                     '?PROFILE=JDBC'
-                     '&JDBC_DRIVER=com.mysql.jdbc.Driver'
-                     '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
-                     )
-            FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
+The meaning of `BATCH_SIZE` is given in section [batching of INSERT queries](#Batching).
 
-MySQL instance IP: 192.168.200.6, port: 3306.
+The meaning of `POOL_SIZE` is given in section [using thread pool for INSERT queries](#Thread_pool)
 
-5. query mysql data in HAWQ:
+The meaning of other parameters is given in section [partitioning](#Partitioning).
 
-        gpadmin=# select * from myclass;
-        gpadmin=# select * from myclass where id=2;
 
-# Jdbc Table Fragments
-## intro
-PXF-JDBC plug-in as a  client to access jdbc database.By default, there is
-only one pxf-instance connectied JDBC Table.If the jdbc table data is large,
-you can also use multiple pxf-instance to access the JDBC table by fragments.
+## SELECT queries
 
-## Syntax
-where `<custom-parameters>` can use following partition parameters:
-
-    PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]
-The `PARTITION_BY` parameter indicates which  column to use as the partition column.
-It can be split by colon(':'),the `column_type` current supported : `date|int|enum` .
-The Date format is `yyyy-MM-dd`.
-The `PARTITION_BY` parameter can be null, and there will be only one fragment.
-
-The `RANGE` parameter indicates the range of data to be queried , it can be split by colon(':').
- The range is left-closed, ie: `>= start_value AND < end_value` .
-
-The `INTERVAL` parameter can be split by colon(':'), indicate the interval
- value of one fragment. When `column_type` is `date`,this parameter must
- be split by colon, and `interval_unit` can be `year|month|day`. When
- `column_type` is int, the `interval_unit` can be empty. When `column_type`
- is enum,the `INTERVAL` parameter can be empty.
-
-The syntax examples is :
-
-    * PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'
-    * PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1
-    * PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad
-
-## Usage
-MySQL Table:
-
-    CREATE TABLE sales (id int primary key, cdate date, amt decimal(10,2),grade varchar(30))
-HAWQ Table:
-
-    CREATE EXTERNAL TABLE sales(id integer,
-                 cdate date,
-                 amt float8,
-                 grade text)
-                 LOCATION ('pxf://localhost:51200/sales'
-                         '?PROFILE=JDBC'
-                         '&JDBC_DRIVER=com.mysql.jdbc.Driver'
-                         '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
-                         '&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year'
-                         )
-                 FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
-At PXF-JDBC plugin,this will generate 2 fragments.Then HAWQ assign these fragments to 2 PXF-instance
-to access jdbc table data.
\ No newline at end of file
+The PXF JDBC plugin allows to perform SELECT queries to external tables.
+
+To perform SELECT queries, create an `EXTERNAL READABLE TABLE` or just an `EXTERNAL TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import')` in PXF.
+
+The `BATCH_SIZE` parameter is not used in such tables. *However*, if this parameter is present, its value will be checked for correctness (it must be an integer).
+
+
+## INSERT queries
+
+The PXF JDBC plugin allows to perform INSERT queries to external tables. Note that **the plugin does not guarantee consistency for INSERT queries**. Use a staging table in external database to deal with this.
+
+To perform INSERT queries, create an `EXTERNAL WRITABLE TABLE` with `FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')` in PXF.
+
+The `PARTITION_BY`, `RANGE` and `INTERVAL` parameters in such tables are ignored.
+
+
+### Batching
+
+INSERT queries can be batched. This may significantly increase perfomance if batching is supported by an external database.
+
+Batching is enabled by default, and the default batch size is `100` (this is a [recommended](https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754) value). To control this feature, create an external table with the parameter `BATCH_SIZE` set to:
+* `0` or `1`. Batching will be disabled;
+* `integer > 1`. Batch will be of given size.
+
+Batching must be supported by the JDBC driver of an external database. If the driver does not support batching, behaviour depends on the `BATCH_SIZE` parameter:
+* `BATCH_SIZE` is not present; `BATCH_SIZE` is `0` or `1`. PXF will try to execute INSERT query without batching;
+* `BATCH_SIZE` is an `integer > 1`. INSERT query will fail with an appropriate error message.
+
+
+## Thread pool
+
+The INSERT queries can be processed by multiple threads. This may significantly increase perfomance if the external database can work with multiple connections simultaneously.
+
+It is recommended to use batching together with a thread pool. In this case, each thread receives data from one (whole) batch and processes it. If a thread pool is used without batching, each thread in a pool will receive exactly one tuple; as a rule, this takes much more time than an usual one-thread INSERT.
+
+If any of the threads from pool fails, the user will get the error message. However, if INSERT fails, some data still may be INSERTed into the external database.
+
+To enable thread pool, create an external table with the paramete `POOL_SIZE` set to:
+* `integer < 1`. The number of threads in a pool will be equal to the number of CPUs in the system;
+* `integer > 1`. Thread pool will consist of the given number of threads;
+* `1`. Thread pool will be disabled.
+
+By default (`POOL_SIZE` is absent), thread pool is not used.
+
+
+## Partitioning
+
+PXF JDBC plugin supports simultaneous read access to an external table from multiple PXF segments. This feature is called partitioning.
+
+
+### Syntax
+
+Use the following `<plugin_parameters>` (mentioned above) to activate partitioning:
+
+```
+&PARTITION_BY=<column>:<column_type>
+&RANGE=<start_value>:<end_value>
+[&INTERVAL=<value>[:<unit>]]
+```
+
+* The `PARTITION_BY` parameter indicates which column to use as a partition column. Only one column can be used as the partition column
+    * The `<column>` is the name of a partition column;
+    * The `<column_type>` is the data type of a partition column. At the moment, the **supported types** are `INT`, `DATE` and `ENUM`.
+
+* The `RANGE` parameter indicates the range of data to be queried.
+    * If the partition type is `ENUM`, the `RANGE` parameter must be a list of values, each of which forms its own fragment;
+    * If the partition type is `INT` or `DATE`, the `RANGE` parameter must be a finite left-closed range ( `... >= start_value AND ... < end_value`);
+    * For `DATE` partitions, the date format must be `yyyy-MM-dd`.
+
+* The `INTERVAL` parameter is **required** for `INT` and `DATE` partitions. It is ignored if `<column_type>` is `ENUM`.
+    * The `<value>` is the size of each fragment (the last one may be made smaller by the plugin);
+    * The `<unit>` **must** be provided if `<column_type>` is `DATE`. `year`, `month` and `day` are supported. This parameter is ignored in case of any other `<column_type>`.
+
+Example partitions:
+* `&PARTITION_BY=id:int&RANGE=42:142&INTERVAL=2`
+* `&PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month`
+* `&PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad`
+
+
+### Mechanism
+
+If partitioning is activated, the SELECT query is split into a set of small queries, each of which is called a *fragment*. All the fragments are processed by separate PXF instances simultaneously. If there are more fragments than PXF instances, some instances will process more than one fragment; if only one PXF instance is available, it will process all the fragments.
+
+Extra query constraints (`WHERE` expressions) are automatically added to each fragment to guarantee that every tuple of data is retrieved from the external database exactly once.
+
+
+### Partitioning example
+Consider the following MySQL table:
+```
+CREATE TABLE sales (
+    id int primary key,
+    cdate date,
+    amt decimal(10,2),
+    grade varchar(30)
+)
+```
+and the following HAWQ table:
+```
+CREATE EXTERNAL TABLE sales(
+    id integer,
+    cdate date,
+    amt float8,
+    grade text
+)
+LOCATION ('pxf://sales?PROFILE=JDBC&JDBC_DRIVER=com.mysql.jdbc.Driver&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year')
+FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
+```
+
+The PXF JDBC plugin will generate two fragments  for a query `SELECT * FROM sales`. Then HAWQ will assign each of them to a separate PXF segment. Each segment will perform the SELECT query, and the first one will get tuples with `cdate` values for year `2008`, while the second will get tuples for year `2009`. Then each PXF segment will send its results back to HAWQ, where they will be "concatenated" and returned.
+
+
+## Examples
+
+The following example shows how to access a MySQL table via JDBC.
+
+Suppose MySQL instance is available at `192.168.200.6:3306`. A table in MySQL is created:
+```
+use demodb;
+create table myclass(
+    id int(4) not null primary key,
+    name varchar(20) not null,
+    degree double(16,2)
+);
+```
+
+Then some data is inserted into MySQL table:
+```
+insert into myclass values(1, 'tom', 90);
+insert into myclass values(2, 'john', 94);
+insert into myclass values(3, 'simon', 79);
+```
+
+The MySQL JDBC driver files (JAR) are copied to `/usr/lib/pxf` on all cluster nodes and a line is added to `/etc/pxf/conf/pxf-public.classpath`:
+```
+/usr/lib/pxf/mysql-connector-java-*.jar
+```
+
+After this, all PXF segments are restarted.
+
+Then a table in HAWQ is created:
+```
+CREATE EXTERNAL TABLE myclass(
+    id integer,
+    name text,
+    degree float8
+)
+LOCATION (
+    'pxf://localhost:51200/demodb.myclass?PROFILE=JDBC&JDBC_DRIVER=com.mysql.jdbc.Driver&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
+)
+FORMAT 'CUSTOM' (
+    FORMATTER='pxfwritable_import'
+);
+```
+
+Finally, a query to a HAWQ external table is made:
+```
+SELECT * FROM myclass;
+SELECT id, name FROM myclass WHERE id = 2;
+```

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
new file mode 100644
index 0000000..2cacafd
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
@@ -0,0 +1,353 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
+import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.sql.Statement;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * JDBC tables accessor
+ *
+ * The SELECT queries are processed by {@link java.sql.Statement}
+ *
+ * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
+ * built-in JDBC batches of arbitrary size
+ */
+public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
+    /**
+     * Class constructor
+     */
+    public JdbcAccessor(InputData inputData) throws UserDataException {
+        super(inputData);
+    }
+
+    /**
+     * openForRead() implementation
+     * Create query, open JDBC connection, execute query and store the result into resultSet
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a problem with the connection occurs
+     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     */
+    @Override
+    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
+        if (statementRead != null && !statementRead.isClosed()) {
+            return true;
+        }
+
+        Connection connection = super.getConnection();
+
+        queryRead = buildSelectQuery(connection.getMetaData());
+        statementRead = connection.createStatement();
+        resultSetRead = statementRead.executeQuery(queryRead);
+
+        return true;
+    }
+
+    /**
+     * readNextObject() implementation
+     * Retreive the next tuple from resultSet and return it
+     *
+     * @throws SQLException if a problem in resultSet occurs
+     */
+    @Override
+    public OneRow readNextObject() throws SQLException {
+        if (resultSetRead.next()) {
+            return new OneRow(resultSetRead);
+        }
+        return null;
+    }
+
+    /**
+     * closeForRead() implementation
+     */
+    @Override
+    public void closeForRead() {
+        JdbcPlugin.closeStatement(statementRead);
+    }
+
+    /**
+     * openForWrite() implementation
+     * Create query template and open JDBC connection
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a problem with the connection occurs
+     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     */
+    @Override
+    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
+        if (statementWrite != null && !statementWrite.isClosed()) {
+            throw new SQLException("The connection to an external database is already open.");
+        }
+
+        Connection connection = super.getConnection();
+
+        queryWrite = buildInsertQuery();
+        statementWrite = super.getPreparedStatement(connection, queryWrite);
+
+        // Process batchSize
+        if (!connection.getMetaData().supportsBatchUpdates()) {
+            if ((batchSizeIsSetByUser) && (batchSize > 1)) {
+                throw new SQLException("The external database does not support batch updates");
+            }
+            else {
+                batchSize = 1;
+            }
+        }
+
+        // Process poolSize
+        if (poolSize < 1) {
+            poolSize = Runtime.getRuntime().availableProcessors();
+            LOG.info(
+                "The POOL_SIZE is set to the number of CPUs available (" + Integer.toString(poolSize) + ")"
+            );
+        }
+        if (poolSize > 1) {
+            executorServiceWrite = Executors.newFixedThreadPool(poolSize);
+            poolTasks = new LinkedList<>();
+        }
+
+        // Setup WriterCallableFactory
+        writerCallableFactory = new WriterCallableFactory();
+        writerCallableFactory.setPlugin(this);
+        writerCallableFactory.setQuery(queryWrite);
+        writerCallableFactory.setBatchSize(batchSize);
+        if (poolSize == 1) {
+            writerCallableFactory.setStatement(statementWrite);
+        }
+
+        writerCallable = writerCallableFactory.get();
+
+        return true;
+    }
+
+	/**
+     * writeNextObject() implementation
+     *
+     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
+     * Otherwise, execute an INSERT query immediately
+     *
+     * In both cases, a {@link java.sql.PreparedStatement} is used
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
+     * @throws ClassNotFoundException if pooling is used and the JDBC driver was not found
+     * @throws IllegalStateException if writerCallableFactory was not properly initialized
+     * @throws Exception if it happens in writerCallable.call()
+     */
+    @Override
+    public boolean writeNextObject(OneRow row) throws Exception {
+        if (writerCallable == null) {
+            throw new IllegalStateException("The JDBC connection was not properly initialized (writerCallable is null)");
+        }
+
+        writerCallable.supply(row);
+        if (writerCallable.isCallRequired()) {
+            if (poolSize > 1) {
+                // Pooling is used. Create new writerCallable
+                poolTasks.add(executorServiceWrite.submit(writerCallable));
+                writerCallable = writerCallableFactory.get();
+            }
+            else {
+                // Pooling is not used
+                writerCallable.call();
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * closeForWrite() implementation
+     *
+     * @throws SQLException if a database access error occurs
+     * @throws Exception if it happens in writerCallable.call() or due to runtime errors in thread pool
+     */
+    @Override
+    public void closeForWrite() throws Exception {
+        if ((statementWrite == null) || (writerCallable == null)) {
+            return;
+        }
+
+        try {
+            if (poolSize > 1) {
+                // Process thread pool
+                Exception firstException = null;
+                for (Future<SQLException> task : poolTasks) {
+                    // We need this construction to ensure that we try to close all connections opened by pool threads
+                    try {
+                        SQLException currentSqlException = task.get();
+                        if (currentSqlException != null) {
+                            if (firstException == null) {
+                                firstException = currentSqlException;
+                            }
+                            LOG.error(
+                                "A SQLException in a pool thread occured: " + currentSqlException.getClass() + " " + currentSqlException.getMessage()
+                            );
+                        }
+                    }
+                    catch (Exception e) {
+                        // This exception must have been caused by some thread execution error. However, there may be other exception (maybe of class SQLException) that happened in one of threads that were not examined yet. That is why we do not modify firstException
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                "A runtime exception in a thread pool occured: " + e.getClass() + " " + e.getMessage()
+                            );
+                        }
+                    }
+                }
+                try {
+                    executorServiceWrite.shutdown();
+                    executorServiceWrite.shutdownNow();
+                }
+                catch (Exception e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("executorServiceWrite.shutdown() or .shutdownNow() threw an exception: " + e.getClass() + " " + e.getMessage());
+                    }
+                }
+                if (firstException != null) {
+                    throw firstException;
+                }
+            }
+
+            // Send data that is left
+            writerCallable.call();
+        }
+        finally {
+            JdbcPlugin.closeStatement(statementWrite);
+        }
+    }
+
+
+    /**
+     * Build SELECT query (with "WHERE" and partition constraints)
+     *
+     * @return Complete SQL query
+     *
+     * @throws ParseException if the constraints passed in InputData are incorrect
+     * @throws SQLException if the database metadata is invalid
+     */
+    private String buildSelectQuery(DatabaseMetaData databaseMetaData) throws ParseException, SQLException {
+        if (databaseMetaData == null) {
+            throw new IllegalArgumentException("The provided databaseMetaData is null");
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append("SELECT ");
+
+        // Insert columns' names
+        String columnDivisor = "";
+        for (ColumnDescriptor column : columns) {
+            sb.append(columnDivisor);
+            columnDivisor = ", ";
+            sb.append(column.columnName());
+        }
+
+        // Insert the table name
+        sb.append(" FROM ").append(tableName);
+
+        // Insert regular WHERE constraints
+        (new WhereSQLBuilder(inputData)).buildWhereSQL(databaseMetaData.getDatabaseProductName(), sb);
+
+        // Insert partition constraints
+        JdbcPartitionFragmenter.buildFragmenterSql(inputData, databaseMetaData.getDatabaseProductName(), sb);
+
+        return sb.toString();
+    }
+
+    /**
+     * Build INSERT query template (field values are replaced by placeholders '?')
+     *
+     * @return SQL query with placeholders instead of actual values
+     */
+    private String buildInsertQuery() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("INSERT INTO ");
+
+        // Insert the table name
+        sb.append(tableName);
+
+        // Insert columns' names
+        sb.append("(");
+        String fieldDivisor = "";
+        for (ColumnDescriptor column : columns) {
+            sb.append(fieldDivisor);
+            fieldDivisor = ", ";
+            sb.append(column.columnName());
+        }
+        sb.append(")");
+
+        sb.append(" VALUES ");
+
+        // Insert values placeholders
+        sb.append("(");
+        fieldDivisor = "";
+        for (int i = 0; i < columns.size(); i++) {
+            sb.append(fieldDivisor);
+            fieldDivisor = ", ";
+            sb.append("?");
+        }
+        sb.append(")");
+
+        return sb.toString();
+    }
+
+    // Read variables
+    private String queryRead = null;
+    private Statement statementRead = null;
+    private ResultSet resultSetRead = null;
+
+    // Write variables
+    private String queryWrite = null;
+    private PreparedStatement statementWrite = null;
+    private WriterCallableFactory writerCallableFactory = null;
+    private WriterCallable writerCallable = null;
+    private ExecutorService executorServiceWrite = null;
+    private List<Future<SQLException> > poolTasks = null;
+
+    // Static variables
+    private static final Log LOG = LogFactory.getLog(JdbcAccessor.class);
+}

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
index 3c56ccb..ad331ed 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
@@ -26,14 +26,12 @@ import org.apache.hawq.pxf.api.LogicalFilter;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.text.ParseException;
 
 /**
- * Uses the filter parser code to build a filter object, either simple - a
- * single {@link BasicFilter} object or a
- * compound - a {@link List} of
- * {@link BasicFilter} objects.
- * The subclass {@link WhereSQLBuilder} will use the filter for
- * generate WHERE statement.
+ * A filter builder. Uses a single {@link BasicFilter} or a {@link List} of {@link BasicFilter} objects.
+ *
+ * The subclass {@link WhereSQLBuilder} will use the result to generate WHERE statement.
  */
 public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
     /**
@@ -41,26 +39,28 @@ public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
      * list of such filters.
      *
      * @param filterString the string representation of the filter
-     * @return a single {@link BasicFilter}
-     *         object or a {@link List} of
-     *         {@link BasicFilter} objects.
-     * @throws Exception if parsing the filter failed or filter is not a basic
-     *             filter or list of basic filters
+     * @return a {@link BasicFilter} or a {@link List} of {@link BasicFilter}.
+     * @throws ParseException if parsing the filter failed or filter is not a basic filter or list of basic filters
      */
-    public Object getFilterObject(String filterString) throws Exception {
-        FilterParser parser = new FilterParser(this);
-        Object result = parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET));
-
-        if (!(result instanceof LogicalFilter) && !(result instanceof BasicFilter)
-                && !(result instanceof List)) {
-            throw new Exception("String " + filterString
-                    + " resolved to no filter");
+    public Object getFilterObject(String filterString) throws ParseException {
+        try {
+            FilterParser parser = new FilterParser(this);
+            Object result = parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET));
+
+            if (
+                !(result instanceof LogicalFilter) &&
+                !(result instanceof BasicFilter) &&
+                !(result instanceof List)
+            ) {
+                throw new Exception("'" + filterString + "' could not be resolved to a filter");
+            }
+            return result;
+        }
+        catch (Exception e) {
+            throw new ParseException(e.getMessage(), 0);
         }
-
-        return result;
     }
 
-
     @Override
     public Object build(FilterParser.LogicalOperation op, Object leftOperand, Object rightOperand) {
         return handleLogicalOperation(op, leftOperand, rightOperand);
@@ -72,28 +72,33 @@ public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public Object build(FilterParser.Operation opId, Object leftOperand,
                         Object rightOperand) throws Exception {
         // Assume column is on the left
-        return handleSimpleOperations(opId,
-                (FilterParser.ColumnIndex) leftOperand,
-                (FilterParser.Constant) rightOperand);
+        return handleSimpleOperations(
+            opId,
+            (FilterParser.ColumnIndex) leftOperand,
+            (FilterParser.Constant) rightOperand
+        );
     }
 
     @Override
-    public Object build(FilterParser.Operation operation, Object operand) throws Exception {
-        if (operation == FilterParser.Operation.HDOP_IS_NULL || operation == FilterParser.Operation.HDOP_IS_NOT_NULL) {
-            // use null for the constant value of null comparison
+    public Object build(FilterParser.Operation operation, Object operand) throws UnsupportedOperationException {
+        if (
+            operation == FilterParser.Operation.HDOP_IS_NULL ||
+            operation == FilterParser.Operation.HDOP_IS_NOT_NULL
+        ) {
+            // Use null for the constant value of null comparison
             return handleSimpleOperations(operation, (FilterParser.ColumnIndex) operand, null);
-        } else {
-            throw new Exception("Unsupported unary operation " + operation);
+        }
+        else {
+            throw new UnsupportedOperationException("Unsupported unary operation '" + operation + "'");
         }
     }
 
     /*
-     * Handles simple column-operator-constant expressions Creates a special
-     * filter in the case the column is the row key column
+     * Handles simple column-operator-constant expressions.
+     * Creates a special filter in the case the column is the row key column
      */
     private BasicFilter handleSimpleOperations(FilterParser.Operation opId,
                                                FilterParser.ColumnIndex column,
@@ -102,8 +107,7 @@ public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
     }
 
     /**
-     * Handles AND of already calculated expressions. Currently only AND, in the
-     * future OR can be added
+     * Handles AND of already calculated expressions.
      *
      * Four cases here:
      * <ol>
@@ -135,7 +139,6 @@ public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
     }
 
     private Object handleLogicalOperation(FilterParser.LogicalOperation operator, Object leftOperand, Object rightOperand) {
-
         List<Object> result = new LinkedList<>();
 
         result.add(leftOperand);

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
index 914b7d9..4971269 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
@@ -27,296 +27,311 @@ import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil;
 import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
 
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
 
 /**
- * Fragmenter class for JDBC data resources.
- *
- * Extends the {@link Fragmenter} abstract class, with the purpose of transforming
- * an input data path  (an JDBC Database table name  and user request parameters)  into a list of regions
- * that belong to this table.
- * <br>
- * The parameter Patterns<br>
- * There are three  parameters,  the format is as follows:<br>
- * <pre>
- * <code>PARTITION_BY=column_name:column_type&amp;RANGE=start_value[:end_value]&amp;INTERVAL=interval_num[:interval_unit]</code>
- * </pre>
- * The <code>PARTITION_BY</code> parameter can be split by colon(':'),the <code>column_type</code> current supported : <code>date,int,enum</code> .
- * The Date format is 'yyyy-MM-dd'. <br>
- * The <code>RANGE</code> parameter can be split by colon(':') ,used to identify the starting range of each fragment.
- * The range is left-closed, ie:<code> '&gt;= start_value AND &lt; end_value' </code>.If the <code>column_type</code> is <code>int</code>,
- * the <code>end_value</code> can be empty. If the <code>column_type</code>is <code>enum</code>,the parameter <code>RANGE</code> can be empty. <br>
- * The <code>INTERVAL</code> parameter can be split by colon(':'), indicate the interval value of one fragment.
- * When <code>column_type</code> is <code>date</code>,this parameter must be split by colon, and <code>interval_unit</code> can be <code>year,month,day</code>.
- * When <code>column_type</code> is <code>int</code>, the <code>interval_unit</code> can be empty.
- * When <code>column_type</code> is <code>enum</code>,the <code>INTERVAL</code> parameter can be empty.
- * <br>
- * <p>
- * The syntax examples is :<br>
- * <code>PARTITION_BY=createdate:date&amp;RANGE=2008-01-01:2010-01-01&amp;INTERVAL=1:month'</code> <br>
- * <code>PARTITION_BY=year:int&amp;RANGE=2008:2010&amp;INTERVAL=1</code> <br>
- * <code>PARTITION_BY=grade:enum&amp;RANGE=excellent:good:general:bad</code>
- * </p>
+ * JDBC fragmenter
  *
+ * Splits the query to allow multiple simultaneous SELECTs
  */
 public class JdbcPartitionFragmenter extends Fragmenter {
-    String[] partitionBy = null;
-    String[] range = null;
-    String[] interval = null;
-    PartitionType partitionType = null;
-    String partitionColumn = null;
-    IntervalType intervalType = null;
-    int intervalNum = 1;
+    /**
+     * Insert fragment constraints into the SQL query.
+     *
+     * @param inputData InputData of the fragment
+     * @param dbName Database name (affects the behaviour for DATE partitions)
+     * @param query SQL query to insert constraints to. The query may may contain other WHERE statements
+     */
+    public static void buildFragmenterSql(InputData inputData, String dbName, StringBuilder query) {
+        if (inputData.getUserProperty("PARTITION_BY") == null) {
+            return;
+        }
 
-    //when partitionType is DATE,it is valid
-    Calendar rangeStart = null;
-    Calendar rangeEnd = null;
+        byte[] meta = inputData.getFragmentMetadata();
+        if (meta == null) {
+            return;
+        }
+        String[] partitionBy = inputData.getUserProperty("PARTITION_BY").split(":");
+        String partitionColumn = partitionBy[0];
+        PartitionType partitionType = PartitionType.typeOf(partitionBy[1]);
+        DbProduct dbProduct = DbProduct.getDbProduct(dbName);
 
+        if (!query.toString().contains("WHERE")) {
+            query.append(" WHERE ");
+        }
+        else {
+            query.append(" AND ");
+        }
 
-    enum PartitionType {
-        DATE,
-        INT,
-        ENUM;
+        switch (partitionType) {
+            case DATE: {
+                byte[][] newb = ByteUtil.splitBytes(meta);
+                Date fragStart = new Date(ByteUtil.toLong(newb[0]));
+                Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
 
-        public static PartitionType getType(String str) {
-            return valueOf(str.toUpperCase());
-        }
-    }
+                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+                query.append(partitionColumn).append(" >= ").append(dbProduct.wrapDate(df.format(fragStart)));
+                query.append(" AND ");
+                query.append(partitionColumn).append(" < ").append(dbProduct.wrapDate(df.format(fragEnd)));
 
-    enum IntervalType {
-        DAY,
-        MONTH,
-        YEAR;
+                break;
+            }
+            case INT: {
+                byte[][] newb = ByteUtil.splitBytes(meta);
+                long fragStart = ByteUtil.toLong(newb[0]);
+                long fragEnd = ByteUtil.toLong(newb[1]);
 
-        public static IntervalType type(String str) {
-            return valueOf(str.toUpperCase());
+                query.append(partitionColumn).append(" >= ").append(fragStart);
+                query.append(" AND ");
+                query.append(partitionColumn).append(" < ").append(fragEnd);
+                break;
+            }
+            case ENUM: {
+                query.append(partitionColumn).append(" = '").append(new String(meta)).append("'");
+                break;
+            }
         }
     }
 
     /**
-     * Constructor for JdbcPartitionFragmenter.
+     * Class constructor.
      *
-     * @param inConf input data such as which Jdbc table to scan
-     * @throws UserDataException  if the request parameter is malformed
+     * @param inputData PXF InputData
+     * @throws UserDataException if the request parameter is malformed
      */
-    public JdbcPartitionFragmenter(InputData inConf) throws UserDataException {
-        super(inConf);
-        if (inConf.getUserProperty("PARTITION_BY") == null)
+    public JdbcPartitionFragmenter(InputData inputData) throws UserDataException {
+        super(inputData);
+        if (inputData.getUserProperty("PARTITION_BY") == null) {
             return;
+        }
+
+        // PARTITION_BY
         try {
-            partitionBy = inConf.getUserProperty("PARTITION_BY").split(":");
-            partitionColumn = partitionBy[0];
-            partitionType = PartitionType.getType(partitionBy[1]);
-        } catch (IllegalArgumentException | ArrayIndexOutOfBoundsException e1) {
-            throw new UserDataException("The parameter 'PARTITION_BY' invalid, the pattern is 'column_name:date|int|enum'");
+            partitionType = PartitionType.typeOf(
+                inputData.getUserProperty("PARTITION_BY").split(":")[1]
+            );
+        }
+        catch (IllegalArgumentException | ArrayIndexOutOfBoundsException ex) {
+            throw new UserDataException("The parameter 'PARTITION_BY' is invalid. The pattern is '<column_name>:date|int|enum'");
         }
 
-        //parse and validate parameter-RANGE
+        // RANGE
         try {
-            String rangeStr = inConf.getUserProperty("RANGE");
+            String rangeStr = inputData.getUserProperty("RANGE");
             if (rangeStr != null) {
                 range = rangeStr.split(":");
-                if (range.length == 1 && partitionType != PartitionType.ENUM)
-                    throw new UserDataException("The parameter 'RANGE' does not specify '[:end_value]'");
-            } else
+                if (range.length == 1 && partitionType != PartitionType.ENUM) {
+                    throw new UserDataException("The parameter 'RANGE' must specify ':<end_value>' for this PARTITION_TYPE");
+                }
+            }
+            else {
                 throw new UserDataException("The parameter 'RANGE' must be specified along with 'PARTITION_BY'");
-        } catch (IllegalArgumentException e1) {
-            throw new UserDataException("The parameter 'RANGE' invalid, the pattern is 'start_value[:end_value]'");
+            }
+
+            if (partitionType == PartitionType.DATE) {
+                // Parse DATE partition type values
+                try {
+                    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+                    rangeDateStart = Calendar.getInstance();
+                    rangeDateStart.setTime(df.parse(range[0]));
+                    rangeDateEnd = Calendar.getInstance();
+                    rangeDateEnd.setTime(df.parse(range[1]));
+                }
+                catch (ParseException e) {
+                    throw new UserDataException("The parameter 'RANGE' has invalid date format. The correct format is 'yyyy-MM-dd'");
+                }
+            }
+            else if (partitionType == PartitionType.INT) {
+                // Parse INT partition type values
+                try {
+                    rangeIntStart = Long.parseLong(range[0]);
+                    rangeIntEnd = Long.parseLong(range[1]);
+                }
+                catch (NumberFormatException e) {
+                    throw new UserDataException("The parameter 'RANGE' is invalid. Both range boundaries must be integers");
+                }
+            }
+        }
+        catch (IllegalArgumentException ex) {
+            throw new UserDataException("The parameter 'RANGE' is invalid. The pattern is '<start_value>[:<end_value>]'");
         }
 
-        //parse and validate parameter-INTERVAL
+        // INTERVAL
         try {
-            String intervalStr = inConf.getUserProperty("INTERVAL");
+            String intervalStr = inputData.getUserProperty("INTERVAL");
             if (intervalStr != null) {
-                interval = intervalStr.split(":");
-                intervalNum = Integer.parseInt(interval[0]);
-                if (interval.length > 1)
-                    intervalType = IntervalType.type(interval[1]);
-                if (interval.length == 1 && partitionType == PartitionType.DATE)
-                    throw new UserDataException("The parameter 'INTERVAL' does not specify unit [:year|month|day]");
-            } else if (partitionType != PartitionType.ENUM)
-                throw new UserDataException("The parameter 'INTERVAL' must be specified along with 'PARTITION_BY'");
-            if (intervalNum < 1)
-                throw new UserDataException("The parameter 'INTERVAL' must > 1, but actual is '" + intervalNum + "'");
-        } catch (IllegalArgumentException e1) {
-            throw new UserDataException("The parameter 'INTERVAL' invalid, the pattern is 'interval_num[:interval_unit]'");
-        }
+                String[] interval = intervalStr.split(":");
+                try {
+                    intervalNum = Long.parseLong(interval[0]);
+                    if (intervalNum < 1) {
+                        throw new UserDataException("The '<interval_num>' in parameter 'INTERVAL' must be at least 1, but actual is " + intervalNum);
+                    }
+                }
+                catch (NumberFormatException ex) {
+                    throw new UserDataException("The '<interval_num>' in parameter 'INTERVAL' must be an integer");
+                }
 
-        //parse any date values
-        try {
-            if (partitionType == PartitionType.DATE) {
-                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
-                rangeStart = Calendar.getInstance();
-                rangeStart.setTime(df.parse(range[0]));
-                rangeEnd = Calendar.getInstance();
-                rangeEnd.setTime(df.parse(range[1]));
+                // Intervals of type DATE
+                if (interval.length > 1) {
+                    intervalType = IntervalType.typeOf(interval[1]);
+                }
+                if (interval.length == 1 && partitionType == PartitionType.DATE) {
+                    throw new UserDataException("The parameter 'INTERVAL' must specify unit (':year|month|day') for the PARTITION_TYPE = 'DATE'");
+                }
+            }
+            else if (partitionType != PartitionType.ENUM) {
+                throw new UserDataException("The parameter 'INTERVAL' must be specified along with 'PARTITION_BY' for this PARTITION_TYPE");
             }
-        } catch (ParseException e) {
-            throw new UserDataException("The parameter 'RANGE' has invalid date format. Expected format is 'YYYY-MM-DD'");
+        }
+        catch (IllegalArgumentException ex) {
+            throw new UserDataException("The parameter 'INTERVAL' is invalid. The pattern is '<interval_num>[:<interval_unit>]'");
         }
     }
 
     /**
-     * Returns statistics for Jdbc table. Currently it's not implemented.
      * @throws UnsupportedOperationException ANALYZE for Jdbc plugin is not supported
      */
     @Override
     public FragmentsStats getFragmentsStats() throws UnsupportedOperationException {
-        throw new UnsupportedOperationException("ANALYZE for Jdbc plugin is not supported");
+        throw new UnsupportedOperationException("ANALYZE for JDBC plugin is not supported");
     }
 
     /**
-     * Returns list of fragments containing all of the
-     * Jdbc table data.
+     * getFragments() implementation
      *
-     * @return a list of fragments
-     * @throws Exception if assign host error
+     * @return a list of fragments to be passed to PXF segments
      */
     @Override
-    public List<Fragment> getFragments() throws Exception {
+    public List<Fragment> getFragments() {
         if (partitionType == null) {
-            byte[] fragmentMetadata = null;
-            byte[] userData = null;
-            Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+            // No partition case
+            Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, null);
             fragments.add(fragment);
-            return prepareHosts(fragments);
+            return fragments;
         }
+
         switch (partitionType) {
             case DATE: {
-                int currInterval = intervalNum;
+                Calendar fragStart = rangeDateStart;
 
-                Calendar fragStart = rangeStart;
-                while (fragStart.before(rangeEnd)) {
-                    Calendar fragEnd = (Calendar) fragStart.clone();
+                while (fragStart.before(rangeDateEnd)) {
+                    // Calculate a new fragment
+                    Calendar fragEnd = (Calendar)fragStart.clone();
                     switch (intervalType) {
                         case DAY:
-                            fragEnd.add(Calendar.DAY_OF_MONTH, currInterval);
+                            fragEnd.add(Calendar.DAY_OF_MONTH, (int)intervalNum);
                             break;
                         case MONTH:
-                            fragEnd.add(Calendar.MONTH, currInterval);
+                            fragEnd.add(Calendar.MONTH, (int)intervalNum);
                             break;
                         case YEAR:
-                            fragEnd.add(Calendar.YEAR, currInterval);
+                            fragEnd.add(Calendar.YEAR, (int)intervalNum);
                             break;
                     }
-                    if (fragEnd.after(rangeEnd))
-                        fragEnd = (Calendar) rangeEnd.clone();
+                    if (fragEnd.after(rangeDateEnd))
+                        fragEnd = (Calendar)rangeDateEnd.clone();
 
-                    //make metadata of this fragment , converts the date to a millisecond,then get bytes.
+                    // Convert to byte[]
                     byte[] msStart = ByteUtil.getBytes(fragStart.getTimeInMillis());
                     byte[] msEnd = ByteUtil.getBytes(fragEnd.getTimeInMillis());
                     byte[] fragmentMetadata = ByteUtil.mergeBytes(msStart, msEnd);
 
-                    byte[] userData = new byte[0];
-                    Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+                    // Write fragment
+                    Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
                     fragments.add(fragment);
 
-                    //continue next fragment.
+                    // Prepare for the next fragment
                     fragStart = fragEnd;
                 }
                 break;
             }
             case INT: {
-                int rangeStart = Integer.parseInt(range[0]);
-                int rangeEnd = Integer.parseInt(range[1]);
-                int currInterval = intervalNum;
+                long fragStart = rangeIntStart;
 
-                //validate : curr_interval > 0
-                int fragStart = rangeStart;
-                while (fragStart < rangeEnd) {
-                    int fragEnd = fragStart + currInterval;
-                    if (fragEnd > rangeEnd) fragEnd = rangeEnd;
+                while (fragStart < rangeIntEnd) {
+                    // Calculate a new fragment
+                    long fragEnd = fragStart + intervalNum;
+                    if (fragEnd > rangeIntEnd) {
+                        fragEnd = rangeIntEnd;
+                    }
 
+                    // Convert to byte[]
                     byte[] bStart = ByteUtil.getBytes(fragStart);
                     byte[] bEnd = ByteUtil.getBytes(fragEnd);
                     byte[] fragmentMetadata = ByteUtil.mergeBytes(bStart, bEnd);
 
-                    byte[] userData = new byte[0];
-                    Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+                    // Write fragment
+                    Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
                     fragments.add(fragment);
 
-                    //continue next fragment.
-                    fragStart = fragEnd;// + 1;
+                    // Prepare for the next fragment
+                    fragStart = fragEnd;
                 }
                 break;
             }
-            case ENUM:
+            case ENUM: {
                 for (String frag : range) {
                     byte[] fragmentMetadata = frag.getBytes();
-                    Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, new byte[0]);
+                    Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
                     fragments.add(fragment);
                 }
                 break;
+            }
         }
 
-        return prepareHosts(fragments);
+        return fragments;
     }
 
-    /**
-     * For each fragment , assigned a host address.
-     * In Jdbc Plugin, 'replicas' is the host address of the PXF engine that is running, not the database engine.
-     * Since the other PXF host addresses can not be probed, only the host name of the current PXF engine is returned.
-     * @param fragments a list of fragments
-     * @return a list of fragments that assigned hosts.
-     * @throws UnknownHostException if InetAddress.getLocalHost error.
-     */
-    public static List<Fragment> prepareHosts(List<Fragment> fragments) throws UnknownHostException {
-        for (Fragment fragment : fragments) {
-            String pxfHost = InetAddress.getLocalHost().getHostAddress();
-            String[] hosts = new String[]{pxfHost};
-            fragment.setReplicas(hosts);
-        }
+    // Partition parameters (filled by class constructor)
+    private String[] range = null;
+    private PartitionType partitionType = null;
+    private long intervalNum;
 
-        return fragments;
-    }
+    // Partition parameters for INT partitions (filled by class constructor)
+    private long rangeIntStart;
+    private long rangeIntEnd;
 
-    public String buildFragmenterSql(String dbName, String originSql) {
-        byte[] meta = inputData.getFragmentMetadata();
-        if (meta == null)
-            return originSql;
+    // Partition parameters for DATE partitions (filled by class constructor)
+    private IntervalType intervalType;
+    private Calendar rangeDateStart;
+    private Calendar rangeDateEnd;
 
-        DbProduct dbProduct = DbProduct.getDbProduct(dbName);
+    private static enum PartitionType {
+        DATE,
+        INT,
+        ENUM;
 
-        StringBuilder sb = new StringBuilder(originSql);
-        if (!originSql.contains("WHERE"))
-            sb.append(" WHERE 1=1 ");
+        public static PartitionType typeOf(String str) {
+            return valueOf(str.toUpperCase());
+        }
+    }
 
-        sb.append(" AND ");
-        switch (partitionType) {
-            case DATE: {
-                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
-                //parse metadata of this fragment
-                //validate: the length of metadata == 16 (long)
-                byte[][] newb = ByteUtil.splitBytes(meta, 8);
-                Date fragStart = new Date(ByteUtil.toLong(newb[0]));
-                Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
+    private static enum IntervalType {
+        DAY,
+        MONTH,
+        YEAR;
 
-                sb.append(partitionColumn).append(" >= ").append(dbProduct.wrapDate(df.format(fragStart)));
-                sb.append(" AND ");
-                sb.append(partitionColumn).append(" < ").append(dbProduct.wrapDate(df.format(fragEnd)));
+        public static IntervalType typeOf(String str) {
+            return valueOf(str.toUpperCase());
+        }
+    }
 
-                break;
-            }
-            case INT: {
-                //validate: the length of metadata == 8 (int)
-                byte[][] newb = ByteUtil.splitBytes(meta, 4);
-                int fragStart = ByteUtil.toInt(newb[0]);
-                int fragEnd = ByteUtil.toInt(newb[1]);
-                sb.append(partitionColumn).append(" >= ").append(fragStart);
-                sb.append(" AND ");
-                sb.append(partitionColumn).append(" < ").append(fragEnd);
-                break;
-            }
-            case ENUM:
-                sb.append(partitionColumn).append("='").append(new String(meta)).append("'");
-                break;
+    // A PXF engine to use as a host for fragments
+    private static final String[] pxfHosts;
+    static {
+        String[] localhost = {"localhost"};
+        try {
+            localhost[0] = InetAddress.getLocalHost().getHostAddress();
+        }
+        catch (UnknownHostException ex) {
+            // It is always possible to get 'localhost' address
         }
-        return sb.toString();
+        pxfHosts = localhost;
     }
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
index c0af405..6715508 100644
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
@@ -19,95 +19,209 @@ package org.apache.hawq.pxf.plugins.jdbc;
  * under the License.
  */
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Plugin;
 
-import java.sql.*;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
- * This class resolves the jdbc connection parameter and manages the opening and closing of the jdbc connection.
- * Implemented subclasses: {@link JdbcReadAccessor}.
+ * JDBC tables plugin (base class)
  *
+ * Implemented subclasses: {@link JdbcAccessor}, {@link JdbcResolver}.
  */
 public class JdbcPlugin extends Plugin {
-    private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
-
-    //jdbc connection parameters
-    protected String jdbcDriver = null;
-    protected String dbUrl = null;
-    protected String user = null;
-    protected String pass = null;
-    protected String tblName = null;
-    protected int batchSize = 100;
-
-    //jdbc connection
-    protected Connection dbConn = null;
-    //database type, from DatabaseMetaData.getDatabaseProductName()
-    protected String dbProduct = null;
-
     /**
-     * parse input data
+     * Class constructor
+     *
+     * @param input {@link InputData} provided by PXF
      *
-     * @param input the input data
-     * @throws UserDataException if the request parameter is malformed
+     * @throws UserDataException if one of the required request parameters is not set
      */
     public JdbcPlugin(InputData input) throws UserDataException {
         super(input);
+
         jdbcDriver = input.getUserProperty("JDBC_DRIVER");
+        if (jdbcDriver == null) {
+            throw new UserDataException("JDBC_DRIVER is a required parameter");
+        }
+
         dbUrl = input.getUserProperty("DB_URL");
-        user = input.getUserProperty("USER");
-        pass = input.getUserProperty("PASS");
-        String strBatch = input.getUserProperty("BATCH_SIZE");
-        if (strBatch != null) {
-            batchSize = Integer.parseInt(strBatch);
+        if (dbUrl == null) {
+            throw new UserDataException("DB_URL is a required parameter");
         }
 
-        if (jdbcDriver == null) {
-            throw new UserDataException("JDBC_DRIVER must be set");
+        tableName = input.getDataSource();
+        if (tableName == null) {
+            throw new UserDataException("Data source must be provided");
         }
-        if (dbUrl == null) {
-            throw new UserDataException("DB_URL must be set(read)");
+        /*
+        At the moment, when writing into some table, the table name is
+        concatenated with a special string that is necessary to write into HDFS.
+        However, a raw table name is necessary in case of JDBC.
+        The correct table name is extracted here.
+        */
+        Matcher matcher = tableNamePattern.matcher(tableName);
+        if (matcher.matches()) {
+            inputData.setDataSource(matcher.group(1));
+            tableName = input.getDataSource();
         }
 
-        tblName = input.getDataSource();
-        if (tblName == null) {
-            throw new UserDataException("TABLE_NAME must be set as DataSource.");
+        columns = inputData.getTupleDescription();
+        if (columns == null) {
+            throw new UserDataException("Tuple description must be provided");
+        }
+
+        // This parameter is not required. The default value is null
+        user = input.getUserProperty("USER");
+        if (user != null) {
+            pass = input.getUserProperty("PASS");
+        }
+
+        // This parameter is not required. The default value is 0
+        String batchSizeRaw = input.getUserProperty("BATCH_SIZE");
+        if (batchSizeRaw != null) {
+            try {
+                batchSize = Integer.parseInt(batchSizeRaw);
+                if (batchSize < 1) {
+                    throw new NumberFormatException();
+                } else if (batchSize == 0) {
+                    batchSize = 1;
+                }
+                batchSizeIsSetByUser = true;
+            }
+            catch (NumberFormatException e) {
+                throw new UserDataException("BATCH_SIZE is incorrect: must be a non-negative integer");
+            }
         }
-    }
 
-    public String getTableName() {
-        return tblName;
+        // This parameter is not required. The default value is 1
+        String poolSizeRaw = input.getUserProperty("POOL_SIZE");
+        if (poolSizeRaw != null) {
+            try {
+                poolSize = Integer.parseInt(poolSizeRaw);
+            }
+            catch (NumberFormatException e) {
+                throw new UserDataException("POOL_SIZE is incorrect: must be an integer");
+            }
+        }
     }
 
-    protected Connection openConnection() throws ClassNotFoundException, SQLException {
+    /**
+     * Open a new JDBC connection
+     *
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a connection problem occurs
+     */
+    public Connection getConnection() throws ClassNotFoundException, SQLException, SQLTimeoutException {
+        Connection connection;
         if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Open JDBC: driver=%s,url=%s,user=%s,pass=%s,table=%s",
-                    jdbcDriver, dbUrl, user, pass, tblName));
-        }
-        if (dbConn == null || dbConn.isClosed()) {
-            Class.forName(jdbcDriver);
             if (user != null) {
-                dbConn = DriverManager.getConnection(dbUrl, user, pass);
-            } else {
-                dbConn = DriverManager.getConnection(dbUrl);
+                LOG.debug(String.format("Open JDBC connection: driver=%s, url=%s, user=%s, pass=%s, table=%s",
+                    jdbcDriver, dbUrl, user, pass, tableName));
+            }
+            else {
+                LOG.debug(String.format("Open JDBC connection: driver=%s, url=%s, table=%s",
+                    jdbcDriver, dbUrl, tableName));
             }
-            DatabaseMetaData meta = dbConn.getMetaData();
-            dbProduct = meta.getDatabaseProductName();
         }
-        return dbConn;
+        Class.forName(jdbcDriver);
+        if (user != null) {
+            connection = DriverManager.getConnection(dbUrl, user, pass);
+        }
+        else {
+            connection = DriverManager.getConnection(dbUrl);
+        }
+        return connection;
     }
 
-    protected void closeConnection() {
+    /**
+     * Close a JDBC connection
+     */
+    public static void closeConnection(Connection connection) {
         try {
-            if (dbConn != null) {
-                dbConn.close();
-                dbConn = null;
+            if ((connection != null) && (!connection.isClosed())) {
+                if ((connection.getMetaData().supportsTransactions()) && (!connection.getAutoCommit())) {
+                    connection.commit();
+                }
+                connection.close();
             }
-        } catch (SQLException e) {
-            LOG.error("Close db connection error . ", e);
         }
+        catch (SQLException e) {
+            LOG.error("JDBC connection close error", e);
+        }
+    }
+
+    /**
+     * Prepare a JDBC PreparedStatement
+     *
+     * @throws ClassNotFoundException if the JDBC driver was not found
+     * @throws SQLException if a database access error occurs
+     * @throws SQLTimeoutException if a connection problem occurs
+     */
+    public PreparedStatement getPreparedStatement(Connection connection, String query) throws SQLException, SQLTimeoutException, ClassNotFoundException {
+        if ((connection == null) || (query == null)) {
+            throw new IllegalArgumentException("The provided query or connection is null");
+        }
+        if (connection.getMetaData().supportsTransactions()) {
+            connection.setAutoCommit(false);
+        }
+        return connection.prepareStatement(query);
     }
+
+    /**
+     * Close a JDBC Statement (and the connection it is based on)
+     */
+    public static void closeStatement(Statement statement) {
+        if (statement == null) {
+            return;
+        }
+        Connection connection = null;
+        try {
+            if (!statement.isClosed()) {
+                connection = statement.getConnection();
+                statement.close();
+            }
+        }
+        catch (Exception e) {}
+        closeConnection(connection);
+    }
+
+    // JDBC parameters
+    protected String jdbcDriver = null;
+    protected String dbUrl = null;
+    protected String user = null;
+    protected String pass = null;
+
+    protected String tableName = null;
+
+    // '100' is a recommended value: https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754
+    public static final int DEFAULT_BATCH_SIZE = 100;
+    // After argument parsing, this value is guaranteed to be >= 1
+    protected int batchSize = DEFAULT_BATCH_SIZE;
+    protected boolean batchSizeIsSetByUser = false;
+
+    protected int poolSize = 1;
+
+    // Columns description
+    protected ArrayList<ColumnDescriptor> columns = null;
+
+
+    private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
+
+    // At the moment, when writing into some table, the table name is concatenated with a special string that is necessary to write into HDFS. However, a raw table name is necessary in case of JDBC. This Pattern allows to extract the correct table name from the given InputData.dataSource
+    private static final Pattern tableNamePattern = Pattern.compile("/(.*)/[0-9]*-[0-9]*_[0-9]*");
 }

http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
deleted file mode 100644
index 2ca9a94..0000000
--- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.hawq.pxf.plugins.jdbc;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.hawq.pxf.api.OneRow;
-import org.apache.hawq.pxf.api.ReadAccessor;
-import org.apache.hawq.pxf.api.UserDataException;
-import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
-import org.apache.hawq.pxf.api.utilities.InputData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.sql.*;
-import java.util.ArrayList;
-
-/**
- * Accessor for Jdbc tables. The accessor will open and read a partition belonging
- * to a Jdbc table. JdbcReadAccessor generates and executes SQL from filter and
- * fragmented information, uses {@link JdbcReadResolver } to read the ResultSet, and generates
- * the data type - List {@link OneRow} that HAWQ needs.
- */
-public class JdbcReadAccessor extends JdbcPlugin implements ReadAccessor {
-    private static final Log LOG = LogFactory.getLog(JdbcReadAccessor.class);
-
-    WhereSQLBuilder filterBuilder = null;
-    private ColumnDescriptor keyColumn = null;
-
-    private String querySql = null;
-    private Statement statement = null;
-    private ResultSet resultSet = null;
-
-    public JdbcReadAccessor(InputData input) throws UserDataException {
-        super(input);
-        filterBuilder = new WhereSQLBuilder(inputData);
-
-        //buid select statement (not contain where statement)
-        ArrayList<ColumnDescriptor> columns = input.getTupleDescription();
-        StringBuilder sb = new StringBuilder();
-        sb.append("SELECT ");
-        for (int i = 0; i < columns.size(); i++) {
-            ColumnDescriptor column = columns.get(i);
-            if (column.isKeyColumn())
-                keyColumn = column;
-            if (i > 0) sb.append(",");
-            sb.append(column.columnName());
-        }
-        sb.append(" FROM ").append(getTableName());
-        querySql = sb.toString();
-    }
-
-    /**
-     * open db connection, execute query sql
-     */
-    @Override
-    public boolean openForRead() throws Exception {
-        if (statement != null && !statement.isClosed())
-            return true;
-        super.openConnection();
-
-        statement = dbConn.createStatement();
-
-        resultSet = executeQuery(querySql);
-
-        return true;
-    }
-
-    public ResultSet executeQuery(String sql) throws Exception {
-        String query = sql;
-        if (inputData.hasFilter()) {
-            //parse filter string , build where statement
-            String whereSql = filterBuilder.buildWhereSQL(dbProduct);
-
-            if (whereSql != null) {
-                query = query + " WHERE " + whereSql;
-            }
-        }
-
-        //according to the fragment information, rewriting sql
-        JdbcPartitionFragmenter fragmenter = new JdbcPartitionFragmenter(inputData);
-        query = fragmenter.buildFragmenterSql(dbProduct, query);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("executeQuery: " + query);
-        }
-
-        return statement.executeQuery(query);
-    }
-
-    @Override
-    public OneRow readNextObject() throws Exception {
-        if (resultSet.next()) {
-            return new OneRow(null, resultSet);
-        }
-        return null;
-    }
-
-    @Override
-    public void closeForRead() throws Exception {
-        if (statement != null && !statement.isClosed()) {
-            statement.close();
-            statement = null;
-        }
-        super.closeConnection();
-    }
-}
\ No newline at end of file