You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/04/03 23:14:03 UTC
[phoenix] branch 5.1 updated: PHOENIX-6821: Optimize batching in auto-commit mode (#1587)
This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 3ee1470633 PHOENIX-6821: Optimize batching in auto-commit mode (#1587)
3ee1470633 is described below
commit 3ee1470633bdc214c19616c01441436019a79e5e
Author: Hari Krishna Dara <ha...@gmail.com>
AuthorDate: Tue Apr 4 04:43:55 2023 +0530
PHOENIX-6821: Optimize batching in auto-commit mode (#1587)
* PHOENIX-6821: Optimize batching in auto-commit mode
Squash of the change from PR https://github.com/apache/phoenix/pull/1570
* Fix a merge issue with cherry-pick
* Additional changes needed to fix compilation errors for 5.1 branch
* Fix checkstyle errors
* Fix checkstyle errors
* Remove extraneous whitespace
---
.../org/apache/phoenix/end2end/UpsertValuesIT.java | 182 ++++++++++++++++++---
.../phoenix/exception/BatchUpdateExecution.java | 36 ----
.../apache/phoenix/exception/SQLExceptionCode.java | 21 ++-
.../org/apache/phoenix/execute/MutationState.java | 3 +
.../phoenix/jdbc/PhoenixPreparedStatement.java | 26 +--
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 39 +++--
.../phoenix/jdbc/PhoenixPreparedStatementTest.java | 93 -----------
...tatementTest.java => PhoenixStatementTest.java} | 141 ++++++++++------
8 files changed, 307 insertions(+), 234 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 0fb46b1abe..408e7ca0b0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -17,16 +17,19 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.util.PhoenixRuntime.REQUEST_METRIC_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.TestUtil.closeStatement;
import static org.apache.phoenix.util.TestUtil.closeStmtAndConn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.math.BigDecimal;
import java.math.RoundingMode;
+import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
@@ -36,6 +39,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.client.Result;
@@ -45,6 +49,8 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.DateUtil;
@@ -55,6 +61,7 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.function.ThrowingRunnable;
@Category(ParallelStatsDisabledTest.class)
@@ -454,23 +461,25 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT {
closeStmtAndConn(stmt, conn);
}
}
-
-
- @Test
- public void testBatchedUpsert() throws Exception {
+
+ private void testBatchedUpsert(boolean autocommit) throws Exception {
String tableName = generateUniqueName();
Properties props = new Properties();
+ props.setProperty(REQUEST_METRIC_ATTRIB, "true");
+ props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
Connection conn = null;
PreparedStatement pstmt = null;
+ Statement stmt = null;
try {
conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
} finally {
closeStmtAndConn(pstmt, conn);
}
-
+
try {
conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autocommit);
pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
pstmt.setString(1, "a");
pstmt.setInt(2, 1);
@@ -478,12 +487,21 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT {
pstmt.setString(1, "b");
pstmt.setInt(2, 2);
pstmt.addBatch();
+ pstmt.setString(1, "c");
+ pstmt.setInt(2, 3);
+ pstmt.addBatch();
pstmt.executeBatch();
- conn.commit();
+ if (!autocommit) {
+ conn.commit();
+ }
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ Map<String, Map<MetricType, Long>> mutationMetrics = pConn.getMutationMetrics();
+ Assert.assertEquals(3, (long) mutationMetrics.get(tableName).get(MetricType.MUTATION_BATCH_SIZE));
+ Assert.assertEquals(autocommit, conn.getAutoCommit());
} finally {
- closeStmtAndConn(pstmt, conn);
+ closeStmtAndConn(pstmt, conn);
}
-
+
try {
conn = DriverManager.getConnection(getUrl(), props);
pstmt = conn.prepareStatement("select * from " + tableName);
@@ -494,49 +512,161 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT {
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals(2, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals(3, rs.getInt(2));
assertFalse(rs.next());
} finally {
- closeStmtAndConn(pstmt, conn);
+ closeStmtAndConn(pstmt, conn);
}
-
- conn = DriverManager.getConnection(getUrl(), props);
- Statement stmt = conn.createStatement();
+
try {
- stmt.addBatch("upsert into " + tableName + " values ('c', 3)");
- stmt.addBatch("select count(*) from " + tableName);
- stmt.addBatch("upsert into " + tableName + " values ('a', 4)");
+ conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autocommit);
+ stmt = conn.createStatement();
+ stmt.addBatch("upsert into " + tableName + " values ('d', 4)");
+ stmt.addBatch("upsert into " + tableName + " values ('a', 5)");
ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
+ assertEquals(3, rs.getInt(1));
int[] result = stmt.executeBatch();
- assertEquals(3,result.length);
+ assertEquals(2, result.length);
assertEquals(result[0], 1);
- assertEquals(result[1], -2);
- assertEquals(result[2], 1);
+ assertEquals(result[1], 1);
conn.commit();
} finally {
- closeStmtAndConn(pstmt, conn);
+ closeStmtAndConn(stmt, conn);
}
-
+
try {
conn = DriverManager.getConnection(getUrl(), props);
- pstmt = conn.prepareStatement("select * from " + tableName);
- ResultSet rs = pstmt.executeQuery();
+ stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("select * from " + tableName);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
- assertEquals(4, rs.getInt(2));
+ assertEquals(5, rs.getInt(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals(2, rs.getInt(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals(3, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals("d", rs.getString(1));
+ assertEquals(4, rs.getInt(2));
assertFalse(rs.next());
} finally {
- closeStmtAndConn(pstmt, conn);
+ closeStmtAndConn(stmt, conn);
}
+
+ try {
+ conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(autocommit);
+ stmt = conn.createStatement();
+ stmt.addBatch("delete from " + tableName + " where v <= 4");
+ stmt.addBatch("delete from " + tableName + " where v = 5");
+ int[] result = stmt.executeBatch();
+ assertEquals(2, result.length);
+ assertEquals(result[0], 3);
+ assertEquals(result[1], 1);
+ conn.commit();
+ } finally {
+ closeStmtAndConn(stmt, conn);
+ }
+ try {
+ conn = DriverManager.getConnection(getUrl(), props);
+ pstmt = conn.prepareStatement("select count(*) from " + tableName);
+ ResultSet rs = pstmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ } finally {
+ closeStmtAndConn(stmt, conn);
+ }
+
}
-
+
+ @Test
+ public void testBatchedUpsert() throws Exception {
+ testBatchedUpsert(false);
+ }
+
+ @Test
+ public void testBatchedUpsertAutoCommit() throws Exception {
+ testBatchedUpsert(true);
+ }
+
+ @Test
+ public void testBatchedUpsertMultipleBatches() throws Exception {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+ PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
+ pstmt.setString(1, "a");
+ pstmt.setInt(2, 1);
+ pstmt.addBatch();
+ pstmt.executeBatch();
+ pstmt.setString(1, "b");
+ pstmt.setInt(2, 2);
+ pstmt.addBatch();
+ pstmt.executeBatch();
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ }
+ }
+
+ private void testBatchRollback(boolean autocommit) throws Exception {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+ conn.setAutoCommit(autocommit);
+ PreparedStatement pstmt = conn.prepareStatement("upsert into " + tableName + " values (?, ?)");
+ pstmt.setString(1, "a");
+ pstmt.setInt(2, 1);
+ pstmt.addBatch();
+ pstmt.executeBatch();
+ conn.rollback();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("select count(*) from " + tableName);
+ assertTrue(rs.next());
+ assertEquals(autocommit ? 1 : 0, rs.getInt(1));
+ }
+ }
+
+ @Test
+ public void testBatchRollback() throws Exception {
+ testBatchRollback(false);
+ }
+
+ @Test
+ public void testBatchNoRollbackWithAutoCommit() throws Exception {
+ testBatchRollback(true);
+ }
+
+ @Test
+ public void testDQLFailsInBatch() throws Exception {
+ String tableName = generateUniqueName();
+ Properties props = new Properties();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("create table " + tableName + " (k varchar primary key, v integer)");
+ Statement stmt = conn.createStatement();
+ stmt.addBatch("select * from " + tableName);
+ BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+ assertEquals("java.sql.BatchUpdateException: ERROR 1151 (XCL51): A batch operation can't include a statement that produces result sets.",
+ ex.getMessage());
+ }
+ }
+
private static Date toDate(String dateString) {
return DateUtil.parseDate(dateString);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
deleted file mode 100644
index d3cd82b54a..0000000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.exception;
-
-import java.sql.SQLException;
-
-public class BatchUpdateExecution extends SQLException {
- private static final long serialVersionUID = 1L;
- private static SQLExceptionCode code = SQLExceptionCode.BATCH_EXCEPTION;
- private final int batchIndex;
-
- public BatchUpdateExecution(Throwable cause, int batchIndex) {
- super(new SQLExceptionInfo.Builder(code).build().toString(),
- code.getSQLState(), code.getErrorCode(), cause);
- this.batchIndex = batchIndex;
- }
-
- public int getBatchIndex() {
- return batchIndex;
- }
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 3907ebd555..26b17e6964 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.exception;
+import java.sql.BatchUpdateException;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.Map;
@@ -444,6 +445,8 @@ public enum SQLExceptionCode {
"Duplicate ENCODED_QUALIFIER."),
MISSING_CQ(1150, "XCL49",
"Missing ENCODED_QUALIFIER."),
+ EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
+ + "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
/**
@@ -611,15 +614,16 @@ public enum SQLExceptionCode {
}
public static interface Factory {
- public static final Factory DEFAULT = new Factory() {
+ Factory DEFAULT = new Factory() {
@Override
public SQLException newException(SQLExceptionInfo info) {
- return new SQLException(info.toString(), info.getCode().getSQLState(), info.getCode().getErrorCode(), info.getRootCause());
+ return new SQLException(info.toString(), info.getCode().getSQLState(),
+ info.getCode().getErrorCode(), info.getRootCause());
}
};
- public static final Factory SYNTAX_ERROR = new Factory() {
+ Factory SYNTAX_ERROR = new Factory() {
@Override
public SQLException newException(SQLExceptionInfo info) {
@@ -627,7 +631,16 @@ public enum SQLExceptionCode {
}
};
- public SQLException newException(SQLExceptionInfo info);
+ Factory BATCH_UPDATE_ERROR = new Factory() {
+
+ @Override
+ public SQLException newException(SQLExceptionInfo info) {
+ return new BatchUpdateException(info.toString(), info.getCode().getSQLState(),
+ info.getCode().getErrorCode(), (int[]) null, info.getRootCause());
+ }
+
+ };
+ SQLException newException(SQLExceptionInfo info);
}
private static final Map<Integer,SQLExceptionCode> errorCodeMap = Maps.newHashMapWithExpectedSize(SQLExceptionCode.values().length);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index e32f5a6e76..04cb379c42 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1946,4 +1946,7 @@ public class MutationState implements SQLCloseable {
return mutationMetricQueue;
}
+ public boolean isEmpty() {
+ return mutationsMap != null ? mutationsMap.isEmpty() : true;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index d955777cd8..be0324e0bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -160,11 +160,23 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
return compileMutation(statement, query);
}
- boolean execute(boolean batched) throws SQLException {
+ void executeForBatch() throws SQLException {
throwIfUnboundParameters();
- if (!batched && statement.getOperation().isMutation() && !batch.isEmpty()) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
- .build().buildException();
+ if (!statement.getOperation().isMutation()) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET)
+ .build().buildException();
+ }
+ executeMutation(statement, createAuditQueryLogger(statement, query));
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ throwIfUnboundParameters();
+ if (statement.getOperation().isMutation() && !batch.isEmpty()) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+ .build().buildException();
}
if (statement.getOperation().isMutation()) {
executeMutation(statement, createAuditQueryLogger(statement,query));
@@ -172,12 +184,6 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
}
executeQuery(statement, createQueryLogger(statement,query));
return true;
-
- }
-
- @Override
- public boolean execute() throws SQLException {
- return execute(false);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 5562c8d5c5..64e4166171 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ
import java.io.File;
import java.io.IOException;
import java.io.Reader;
+import java.sql.BatchUpdateException;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -78,7 +79,6 @@ import org.apache.phoenix.compile.StatementPlan;
import org.apache.phoenix.compile.TraceQueryPlan;
import org.apache.phoenix.compile.UpsertCompiler;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.BatchUpdateExecution;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.UpgradeRequiredException;
@@ -456,7 +456,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
setLastUpdateCount(lastUpdateCount);
setLastUpdateOperation(stmt.getOperation());
connection.incrementStatementExecutionCounter();
- if(queryLogger.isAuditLoggingEnabled()) {
+ if (queryLogger.isAuditLoggingEnabled()) {
queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString());
queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount);
@@ -489,7 +489,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}, PhoenixContextExecutor.inContext(),
Tracing.withTracing(connection, this.toString()));
} catch (Exception e) {
- if(queryLogger.isAuditLoggingEnabled()) {
+ if (queryLogger.isAuditLoggingEnabled()) {
queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt));
queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e));
queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString());
@@ -861,7 +861,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
@SuppressWarnings("unchecked")
@Override
public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
- if(!getUdfParseNodes().isEmpty()) {
+ if (!getUdfParseNodes().isEmpty()) {
stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
}
DeleteCompiler compiler = new DeleteCompiler(stmt, this.getOperation());
@@ -1829,26 +1829,41 @@ public class PhoenixStatement implements Statement, SQLCloseable {
/**
* Execute the current batch of statements. If any exception occurs
- * during execution, a org.apache.phoenix.exception.BatchUpdateException
- * is thrown which includes the index of the statement within the
- * batch when the exception occurred.
+ * during execution, a {@link java.sql.BatchUpdateException}
+ * is thrown which compposes the update counts for statements executed so
+ * far.
*/
@Override
public int[] executeBatch() throws SQLException {
int i = 0;
+ int[] returnCodes = new int [batch.size()];
+ Arrays.fill(returnCodes, -1);
+ boolean autoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
try {
- int[] returnCodes = new int [batch.size()];
for (i = 0; i < returnCodes.length; i++) {
PhoenixPreparedStatement statement = batch.get(i);
- returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
+ statement.executeForBatch();
+ returnCodes[i] = statement.getUpdateCount();
}
// Flush all changes in batch if auto flush is true
flushIfNecessary();
// If we make it all the way through, clear the batch
clearBatch();
+ if (autoCommit) {
+ connection.commit();
+ }
return returnCodes;
- } catch (Throwable t) {
- throw new BatchUpdateExecution(t,i);
+ } catch (SQLException t) {
+ if (i == returnCodes.length) {
+ // Exception after for loop, perhaps in commit(), discard returnCodes.
+ throw new BatchUpdateException(t);
+ } else {
+ returnCodes[i] = Statement.EXECUTE_FAILED;
+ throw new BatchUpdateException(returnCodes, t);
+ }
+ } finally {
+ connection.setAutoCommit(autoCommit);
}
}
@@ -1931,7 +1946,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
TableName tableName = null;
if (stmt instanceof ExecutableSelectStatement) {
TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
- if(from instanceof NamedTableNode) {
+ if (from instanceof NamedTableNode) {
tableName = ((NamedTableNode)from).getName();
}
} else if (stmt instanceof ExecutableUpsertStatement) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
index 56a524c6ce..616e3a029a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
@@ -85,97 +85,4 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
}
}
-
- @Test
- /**
- * Validates that if a user sets the query timeout via the
- * stmt.setQueryTimeout() JDBC method, we correctly store the timeout
- * in both milliseconds and seconds.
- */
- public void testSettingQueryTimeoutViaJdbc() throws Exception {
- // Arrange
- Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Act
- stmt.setQueryTimeout(3);
-
- // Assert
- assertEquals(3, stmt.getQueryTimeout());
- assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- /**
- * Validates if a user sets the timeout to zero that we store the timeout
- * in millis as the Integer.MAX_VALUE.
- */
- public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
- // Arrange
- Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Act
- stmt.setQueryTimeout(0);
-
- // Assert
- assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
- assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- /**
- * Validates that is negative value is supplied we set the timeout to the default.
- */
- public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
- // Arrange
- Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
- PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
- int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
- QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
-
- // Act
- stmt.setQueryTimeout(-1);
-
- // Assert
- assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
- assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- /**
- * Validates that setting custom phoenix query timeout using
- * the phoenix.query.timeoutMs config property is honored.
- */
- public void testCustomQueryTimeout() throws Exception {
- // Arrange
- Properties connectionProperties = new Properties();
- connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
- Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Assert
- assertEquals(3, stmt.getQueryTimeout());
- assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
- }
-
- @Test
- public void testZeroCustomQueryTimeout() throws Exception {
- // Arrange
- Properties connectionProperties = new Properties();
- connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
- Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
- PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
- // Assert
- assertEquals(0, stmt.getQueryTimeout());
- assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
- }
-
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
similarity index 60%
copy from phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
copy to phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
index 56a524c6ce..2043cd7aab 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixStatementTest.java
@@ -18,74 +18,56 @@
package org.apache.phoenix.jdbc;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.sql.*;
+import java.util.List;
import java.util.Properties;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
-public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
+public class PhoenixStatementTest extends BaseConnectionlessQueryTest {
- @Test
- public void testSetParameter_InvalidIndex() throws Exception {
- Properties connectionProperties = new Properties();
- Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
-
- PreparedStatement stmt = connection.prepareStatement(
- "UPSERT INTO " + ATABLE + " (organization_id, entity_id, a_integer) " +
- "VALUES (?,?,?)");
-
- stmt.setString(1, "AAA");
- stmt.setString(2, "BBB");
- stmt.setInt(3, 1);
-
- try {
- stmt.setString(4, "Invalid bind column");
- fail("Setting a value for a column that doesn't exist should throw SQLException");
- } catch (SQLException e) {
- // Expected exception
- }
-
- try {
- stmt.setString(-1, "Invalid bind column");
- fail("Setting a value for a column that doesn't exist should throw SQLException");
- } catch (SQLException e) {
- // Expected exception
- }
- }
-
@Test
public void testMutationUsingExecuteQueryShouldFail() throws Exception {
Properties connectionProperties = new Properties();
Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("DELETE FROM " + ATABLE);
+ Statement stmt = connection.createStatement();
try {
- stmt.executeQuery();
+ stmt.executeQuery("DELETE FROM " + ATABLE);
fail();
} catch(SQLException e) {
assertEquals(SQLExceptionCode.EXECUTE_QUERY_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
}
}
-
+
@Test
public void testQueriesUsingExecuteUpdateShouldFail() throws Exception {
Properties connectionProperties = new Properties();
Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
+ Statement stmt = connection.createStatement();
try {
- stmt.executeUpdate();
+ stmt.executeUpdate("SELECT * FROM " + ATABLE);
fail();
} catch(SQLException e) {
assertEquals(SQLExceptionCode.EXECUTE_UPDATE_NOT_APPLICABLE.getErrorCode(), e.getErrorCode());
}
}
-
+
@Test
/**
* Validates that if a user sets the query timeout via the
@@ -95,17 +77,17 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
public void testSettingQueryTimeoutViaJdbc() throws Exception {
// Arrange
Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
+ Statement stmt = connection.createStatement();
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
+
// Act
stmt.setQueryTimeout(3);
-
+
// Assert
assertEquals(3, stmt.getQueryTimeout());
assertEquals(3000, phoenixStmt.getQueryTimeoutInMillis());
}
-
+
@Test
/**
* Validates if a user sets the timeout to zero that we store the timeout
@@ -114,17 +96,17 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
public void testSettingZeroQueryTimeoutViaJdbc() throws Exception {
// Arrange
Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
+ Statement stmt = connection.createStatement();
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
+
// Act
stmt.setQueryTimeout(0);
-
+
// Assert
assertEquals(Integer.MAX_VALUE / 1000, stmt.getQueryTimeout());
assertEquals(Integer.MAX_VALUE, phoenixStmt.getQueryTimeoutInMillis());
}
-
+
@Test
/**
* Validates that is negative value is supplied we set the timeout to the default.
@@ -132,20 +114,20 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
public void testSettingNegativeQueryTimeoutViaJdbc() throws Exception {
// Arrange
Connection connection = DriverManager.getConnection(getUrl());
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
+ Statement stmt = connection.createStatement();
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
int defaultQueryTimeout = phoenixConnection.getQueryServices().getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
-
+
// Act
stmt.setQueryTimeout(-1);
-
+
// Assert
assertEquals(defaultQueryTimeout / 1000, stmt.getQueryTimeout());
assertEquals(defaultQueryTimeout, phoenixStmt.getQueryTimeoutInMillis());
}
-
+
@Test
/**
* Validates that setting custom phoenix query timeout using
@@ -156,26 +138,79 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
Properties connectionProperties = new Properties();
connectionProperties.setProperty("phoenix.query.timeoutMs", "2350");
Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
+ Statement stmt = connection.createStatement();
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
+
// Assert
assertEquals(3, stmt.getQueryTimeout());
assertEquals(2350, phoenixStmt.getQueryTimeoutInMillis());
}
-
+
@Test
public void testZeroCustomQueryTimeout() throws Exception {
// Arrange
Properties connectionProperties = new Properties();
connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
- PreparedStatement stmt = connection.prepareStatement("SELECT * FROM " + ATABLE);
+ Statement stmt = connection.createStatement();
PhoenixStatement phoenixStmt = stmt.unwrap(PhoenixStatement.class);
-
+
// Assert
assertEquals(0, stmt.getQueryTimeout());
assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
}
+ @Test
+ public void testExecuteBatchWithFailedStatement() throws Exception {
+ // Arrange
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class));
+ Whitebox.setInternalState(stmt, "connection", connSpy);
+ List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+ mock(PhoenixPreparedStatement.class),
+ mock(PhoenixPreparedStatement.class),
+ mock(PhoenixPreparedStatement.class));
+ Whitebox.setInternalState(stmt, "batch", batch);
+ final String exMsg = "TEST";
+ when(batch.get(0).getUpdateCount()).thenReturn(1);
+ doThrow(new SQLException(exMsg)).when(batch.get(1)).executeForBatch();
+ // However, we don't expect this to be called.
+ when(batch.get(1).getUpdateCount()).thenReturn(1);
+
+ // Act & Assert
+ BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+ assertEquals(exMsg, ex.getCause().getMessage());
+ int[] updateCounts = ex.getUpdateCounts();
+ assertEquals(3, updateCounts.length);
+ assertEquals(1, updateCounts[0]);
+ assertEquals(Statement.EXECUTE_FAILED, updateCounts[1]);
+ assertEquals(-1, updateCounts[2]);
+ verify(connSpy, never()).commit(); // Ensure commit was never called.
+ }
+
+ @Test
+ public void testExecuteBatchWithCommitFailure() throws Exception {
+ // Arrange
+ Properties connectionProperties = new Properties();
+ connectionProperties.setProperty("phoenix.query.timeoutMs", "0");
+ Connection connection = DriverManager.getConnection(getUrl(), connectionProperties);
+ Statement stmt = connection.createStatement();
+ PhoenixConnection connSpy = spy(connection.unwrap(PhoenixConnection.class));
+ Whitebox.setInternalState(stmt, "connection", connSpy);
+ List<PhoenixPreparedStatement> batch = Lists.newArrayList(
+ mock(PhoenixPreparedStatement.class));
+ Whitebox.setInternalState(stmt, "batch", batch);
+ final String exMsg = "TEST";
+ doThrow(new SQLException(exMsg)).when(connSpy).commit();
+ when(connSpy.getAutoCommit()).thenReturn(true);
+
+ // Act & Assert
+ BatchUpdateException ex = assertThrows(BatchUpdateException.class, () -> stmt.executeBatch());
+ assertEquals(exMsg, ex.getCause().getMessage());
+ assertNull(ex.getUpdateCounts());
+ }
+
}