You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2021/06/24 09:10:49 UTC

[shardingsphere] 07/08: Implements openGauss batch bind protocol (#10850)

This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch opengauss_adapt
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git

commit f50959a83001d6e2b8bda3df1b4b3924d4f7018c
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Thu Jun 17 23:53:46 2021 +0800

    Implements openGauss batch bind protocol (#10850)
    
    * Implements openGauss BatchBind
    
    * Add javadoc
---
 .../command/PostgreSQLCommandPacketFactory.java    |   3 +
 .../command/PostgreSQLCommandPacketType.java       |   2 +
 .../binary/bind/OpenGaussComBatchBindPacket.java   | 179 +++++++++++++++++++++
 .../command/PostgreSQLCommandExecutorFactory.java  |   4 +
 .../binary/bind/OpenGaussComBatchBindExecutor.java | 171 ++++++++++++++++++++
 5 files changed, 359 insertions(+)

diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java
index 070bc37..9832864 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketFactory.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.db.protocol.postgresql.packet.command;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.admin.PostgreSQLUnsupportedCommandPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.describe.PostgreSQLComDescribePacket;
@@ -52,6 +53,8 @@ public final class PostgreSQLCommandPacketFactory {
                 return new PostgreSQLComParsePacket(payload);
             case BIND_COMMAND:
                 return new PostgreSQLComBindPacket(payload, connectionId);
+            case BATCH_BIND_COMMAND:
+                return new OpenGaussComBatchBindPacket(payload, connectionId);
             case DESCRIBE_COMMAND:
                 return new PostgreSQLComDescribePacket(payload);
             case EXECUTE_COMMAND:
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
index 014ad2d..f59f684 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
@@ -37,6 +37,8 @@ public enum PostgreSQLCommandPacketType implements CommandPacketType, PostgreSQL
     
     BIND_COMMAND('B'),
     
+    BATCH_BIND_COMMAND('U'),
+    
     DESCRIBE_COMMAND('D'),
     
     EXECUTE_COMMAND('E'),
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
new file mode 100644
index 0000000..8c290e9
--- /dev/null
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/binary/bind/OpenGaussComBatchBindPacket.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind;
+
+import com.google.common.collect.Lists;
+import lombok.Getter;
+import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
+import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatement;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.protocol.PostgreSQLBinaryProtocolValue;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.protocol.PostgreSQLBinaryProtocolValueFactory;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
+import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Batch bind packet for openGauss.
+ */
+@Getter
+public final class OpenGaussComBatchBindPacket extends PostgreSQLCommandPacket {
+    
+    private final String statementId;
+    
+    private final String sql;
+    
+    private final List<Integer> resultFormatCodes;
+    
+    private final List<List<Object>> parameters;
+    
+    public OpenGaussComBatchBindPacket(final PostgreSQLPacketPayload payload, final int connectionId) {
+        payload.readInt4();
+        payload.readInt4();
+        payload.readStringNul();
+        statementId = payload.readStringNul();
+        int parameterFormatCount = payload.readInt2();
+        List<Integer> parameterFormats = new ArrayList<>(parameterFormatCount);
+        for (int i = 0; i < parameterFormatCount; i++) {
+            parameterFormats.add(payload.readInt2());
+        }
+        int resultFormatsLength = payload.readInt2();
+        resultFormatCodes = new ArrayList<>(resultFormatsLength);
+        for (int i = 0; i < resultFormatsLength; i++) {
+            resultFormatCodes.add(payload.readInt2());
+        }
+        PostgreSQLBinaryStatement binaryStatement = PostgreSQLBinaryStatementRegistry.getInstance().get(connectionId).getBinaryStatement(statementId);
+        sql = null == binaryStatement ? null : binaryStatement.getSql();
+        List<Object> allParameters = null == sql ? Collections.emptyList() : getParameters(payload, parameterFormats, binaryStatement.getColumnTypes());
+        parameters = Lists.partition(allParameters, parameterFormatCount);
+        payload.readInt1();
+        payload.readStringNul();
+        payload.readInt4();
+    }
+    
+    private List<Object> getParameters(final PostgreSQLPacketPayload payload, final List<Integer> parameterFormats, final List<PostgreSQLBinaryColumnType> columnTypes) {
+        int parameterCount = payload.readInt2();
+        List<Object> result = new ArrayList<>(parameterCount);
+        for (int parameterIndex = 0; hasNextParameter(payload); parameterIndex++) {
+            int parameterValueLength = payload.readInt4();
+            if (-1 == parameterValueLength) {
+                result.add(null);
+                continue;
+            }
+            int modedParameterIndex = parameterIndex % parameterCount;
+            Object parameterValue = isTextParameterValue(parameterFormats, modedParameterIndex)
+                    ? getTextParameters(payload, parameterValueLength, columnTypes.get(modedParameterIndex)) : getBinaryParameters(payload, parameterValueLength, columnTypes.get(modedParameterIndex));
+            result.add(parameterValue);
+        }
+        return result;
+    }
+    
+    private boolean hasNextParameter(final PostgreSQLPacketPayload payload) {
+        payload.getByteBuf().markReaderIndex();
+        int c = payload.readInt1();
+        payload.getByteBuf().resetReaderIndex();
+        return 'E' != c;
+    }
+    
+    private boolean isTextParameterValue(final List<Integer> parameterFormats, final int parameterIndex) {
+        return parameterFormats.isEmpty() || 0 == parameterFormats.get(parameterIndex % parameterFormats.size());
+    }
+    
+    private Object getTextParameters(final PostgreSQLPacketPayload payload, final int parameterValueLength, final PostgreSQLBinaryColumnType columnType) {
+        byte[] bytes = new byte[parameterValueLength];
+        payload.getByteBuf().readBytes(bytes);
+        return getTextParameters(new String(bytes), columnType);
+    }
+    
+    private Object getTextParameters(final String textValue, final PostgreSQLBinaryColumnType columnType) {
+        switch (columnType) {
+            case POSTGRESQL_TYPE_UNSPECIFIED:
+                return new PostgreSQLTypeUnspecifiedSQLParameter(textValue);
+            case POSTGRESQL_TYPE_BOOL:
+                return Boolean.valueOf(textValue);
+            case POSTGRESQL_TYPE_INT2:
+            case POSTGRESQL_TYPE_INT4:
+                return Integer.parseInt(textValue);
+            case POSTGRESQL_TYPE_INT8:
+                return Long.parseLong(textValue);
+            case POSTGRESQL_TYPE_FLOAT4:
+                return Float.parseFloat(textValue);
+            case POSTGRESQL_TYPE_FLOAT8:
+                return Double.parseDouble(textValue);
+            case POSTGRESQL_TYPE_NUMERIC:
+                try {
+                    return Integer.parseInt(textValue);
+                } catch (final NumberFormatException ignored) {
+                }
+                try {
+                    return Long.parseLong(textValue);
+                } catch (final NumberFormatException ignored) {
+                }
+                return new BigDecimal(textValue);
+            case POSTGRESQL_TYPE_DATE:
+                return Date.valueOf(textValue);
+            case POSTGRESQL_TYPE_TIME:
+                return Time.valueOf(textValue);
+            case POSTGRESQL_TYPE_TIMESTAMP:
+            case POSTGRESQL_TYPE_TIMESTAMPTZ:
+                return Timestamp.valueOf(textValue);
+            default:
+                return textValue;
+        }
+    }
+    
+    private Object getBinaryParameters(final PostgreSQLPacketPayload payload, final int parameterValueLength, final PostgreSQLBinaryColumnType columnType) {
+        PostgreSQLBinaryProtocolValue binaryProtocolValue = PostgreSQLBinaryProtocolValueFactory.getBinaryProtocolValue(columnType);
+        return binaryProtocolValue.read(payload, parameterValueLength);
+    }
+    
+    /**
+     * Get result format by column index.
+     *
+     * @param columnIndex column index
+     * @return result format
+     */
+    public PostgreSQLColumnFormat getResultFormatByColumnIndex(final int columnIndex) {
+        if (resultFormatCodes.isEmpty()) {
+            return PostgreSQLColumnFormat.TEXT;
+        }
+        if (1 == resultFormatCodes.size()) {
+            return PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(0));
+        }
+        return PostgreSQLColumnFormat.valueOf(resultFormatCodes.get(columnIndex));
+    }
+    
+    @Override
+    public void write(final PostgreSQLPacketPayload payload) {
+    }
+    
+    @Override
+    public PostgreSQLIdentifierTag getIdentifier() {
+        return PostgreSQLCommandPacketType.BATCH_BIND_COMMAND;
+    }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
index a0e9e75..f5bb0c1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
@@ -22,6 +22,7 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.execute.PostgreSQLComExecutePacket;
@@ -31,6 +32,7 @@ import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.Bac
 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLComTerminationExecutor;
 import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLUnsupportedCommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind.OpenGaussComBatchBindExecutor;
 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind.PostgreSQLComBindExecutor;
 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.close.PostgreSQLComCloseExecutor;
 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor;
@@ -70,6 +72,8 @@ public final class PostgreSQLCommandExecutorFactory {
             case BIND_COMMAND:
                 connectionContext.getPendingExecutors().add(new PostgreSQLComBindExecutor(connectionContext, (PostgreSQLComBindPacket) commandPacket, backendConnection));
                 break;
+            case BATCH_BIND_COMMAND:
+                return new OpenGaussComBatchBindExecutor(connectionContext, (OpenGaussComBatchBindPacket) commandPacket, backendConnection);
             case DESCRIBE_COMMAND:
                 connectionContext.getPendingExecutors().add(new PostgreSQLComDescribeExecutor(connectionContext));
                 break;
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
new file mode 100644
index 0000000..b85a712
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/OpenGaussComBatchBindExecutor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLBinaryColumnType;
+import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnFormat;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.OpenGaussComBatchBindPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
+import org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
+import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Command batch bind executor for openGauss.
+ */
+@RequiredArgsConstructor
+public final class OpenGaussComBatchBindExecutor implements QueryCommandExecutor {
+    
+    private final PostgreSQLConnectionContext connectionContext;
+    
+    private final OpenGaussComBatchBindPacket packet;
+    
+    private final BackendConnection backendConnection;
+    
+    private final List<DatabaseCommunicationEngine> databaseCommunicationEngines = new LinkedList<>();
+    
+    @Getter
+    private volatile ResponseType responseType;
+    
+    private boolean batchBindComplete;
+    
+    @Override
+    public Collection<DatabasePacket<?>> execute() throws SQLException {
+        List<List<Object>> parameters = packet.getParameters();
+        for (int i = 0; i < parameters.size(); i++) {
+            List<Object> parameter = parameters.get(i);
+            init(parameter);
+            ResponseHeader responseHeader = databaseCommunicationEngines.get(i).execute();
+            if (responseHeader instanceof QueryResponseHeader && connectionContext.getDescribeExecutor().isPresent()) {
+                connectionContext.getDescribeExecutor().get().setRowDescriptionPacket(getRowDescriptionPacket((QueryResponseHeader) responseHeader));
+            }
+            if (responseHeader instanceof UpdateResponseHeader) {
+                responseType = ResponseType.UPDATE;
+                connectionContext.setUpdateCount(connectionContext.getUpdateCount() + ((UpdateResponseHeader) responseHeader).getUpdateCount());
+            }
+        }
+        return Collections.singletonList(new PostgreSQLBindCompletePacket());
+    }
+    
+    private void init(final List<Object> parameter) {
+        databaseCommunicationEngines.add(DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(getSqlStatement(), packet.getSql(), parameter, backendConnection));
+    }
+    
+    private SQLStatement getSqlStatement() {
+        return connectionContext.getSqlStatement().orElseGet(() -> {
+            SQLStatement result = parseSql(packet.getSql(), backendConnection.getSchemaName());
+            connectionContext.setSqlStatement(result);
+            return result;
+        });
+    }
+    
+    private SQLStatement parseSql(final String sql, final String schemaName) {
+        if (sql.isEmpty()) {
+            return new EmptyStatement();
+        }
+        ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(
+                DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
+        return sqlStatementParserEngine.parse(sql, true);
+    }
+    
+    private PostgreSQLRowDescriptionPacket getRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) {
+        responseType = ResponseType.QUERY;
+        Collection<PostgreSQLColumnDescription> columnDescriptions = createColumnDescriptions(queryResponseHeader);
+        return new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions);
+    }
+    
+    private Collection<PostgreSQLColumnDescription> createColumnDescriptions(final QueryResponseHeader queryResponseHeader) {
+        Collection<PostgreSQLColumnDescription> result = new LinkedList<>();
+        int columnIndex = 0;
+        for (QueryHeader each : queryResponseHeader.getQueryHeaders()) {
+            result.add(new PostgreSQLColumnDescription(each.getColumnName(), ++columnIndex, each.getColumnType(), each.getColumnLength(), each.getColumnTypeName()));
+        }
+        return result;
+    }
+    
+    @Override
+    public boolean next() throws SQLException {
+        Iterator<DatabaseCommunicationEngine> iterator = databaseCommunicationEngines.iterator();
+        while (iterator.hasNext()) {
+            if (iterator.next().next()) {
+                return true;
+            } else {
+                iterator.remove();
+            }
+        }
+        return !batchBindComplete && (batchBindComplete = true);
+    }
+    
+    @Override
+    public PostgreSQLPacket getQueryRowPacket() throws SQLException {
+        if (batchBindComplete) {
+            String sqlCommand = connectionContext.getSqlStatement().map(SQLStatement::getClass).map(PostgreSQLCommand::valueOf).map(command -> command.map(Enum::name).orElse("")).orElse("");
+            return new PostgreSQLCommandCompletePacket(sqlCommand, connectionContext.getUpdateCount());
+        }
+        QueryResponseRow queryResponseRow = databaseCommunicationEngines.get(0).getQueryResponseRow();
+        return new PostgreSQLDataRowPacket(getData(queryResponseRow));
+    }
+    
+    private List<Object> getData(final QueryResponseRow queryResponseRow) {
+        Collection<QueryResponseCell> cells = queryResponseRow.getCells();
+        List<Object> result = new ArrayList<>(cells.size());
+        List<QueryResponseCell> columns = new ArrayList<>(cells);
+        for (int i = 0; i < columns.size(); i++) {
+            PostgreSQLColumnFormat format = packet.getResultFormatByColumnIndex(i);
+            result.add(PostgreSQLColumnFormat.BINARY == format ? createBinaryCell(columns.get(i)) : columns.get(i).getData());
+        }
+        return result;
+    }
+    
+    private BinaryCell createBinaryCell(final QueryResponseCell cell) {
+        return new BinaryCell(PostgreSQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell) cell).getJdbcType()), cell.getData());
+    }
+}