You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:41:14 UTC
[39/50] [abbrv] phoenix git commit: Surface partial saves in
CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37
Surface partial saves in CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fa58c782
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fa58c782
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fa58c782
Branch: refs/heads/calcite
Commit: fa58c7821a2e8fce30a8c0ff6e42aa00134dbce0
Parents: dab9d51
Author: Eli Levine <el...@apache.org>
Authored: Thu Feb 26 20:50:02 2015 -0800
Committer: Eli Levine <el...@apache.org>
Committed: Thu Feb 26 20:50:02 2015 -0800
----------------------------------------------------------------------
.../apache/phoenix/execute/PartialCommitIT.java | 302 +++++++++++++++++++
.../apache/phoenix/compile/DeleteCompiler.java | 13 +-
.../apache/phoenix/compile/UpsertCompiler.java | 13 +-
.../apache/phoenix/execute/CommitException.java | 35 ++-
.../apache/phoenix/execute/MutationState.java | 156 ++++++----
.../apache/phoenix/jdbc/PhoenixConnection.java | 37 ++-
.../phoenix/jdbc/PhoenixPreparedStatement.java | 7 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 3 +
.../phoenix/execute/MutationStateTest.java | 64 ++++
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
10 files changed, 543 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
new file mode 100644
index 0000000..550d7de
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Collections.singletonList;
+import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialCommitIT {
+
+ private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase();
+ private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
+ private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
+ private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
+ private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + " where k='z'";
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static String url;
+ private static Driver driver;
+ private static final Properties props = new Properties();
+
+ static {
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ setUpConfigForMiniCluster(conf);
+ conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class);
+ conf.setBoolean("hbase.coprocessor.abortonerror", false);
+ conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+ TEST_UTIL.startMiniCluster();
+ String clientPort = TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
+ url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
+ + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Must update config before starting server
+ props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+ driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+ createTablesWithABitOfData();
+ }
+
+ private static void createTablesWithABitOfData() throws Exception {
+ Properties props = new Properties();
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+
+ try (Connection con = driver.connect(url, new Properties())) {
+ Statement sta = con.createStatement();
+ sta.execute("create table a_success_table (k varchar primary key, c varchar)");
+ sta.execute("create table b_failure_table (k varchar primary key, c varchar)");
+ sta.execute("create table c_success_table (k varchar primary key, c varchar)");
+ con.commit();
+ }
+
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
+
+ try (Connection con = driver.connect(url, new Properties())) {
+ con.setAutoCommit(false);
+ Statement sta = con.createStatement();
+ for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) {
+ sta.execute("upsert into " + table + " values ('z', 'z')");
+ sta.execute("upsert into " + table + " values ('zz', 'zz')");
+ sta.execute("upsert into " + table + " values ('zzz', 'zzz')");
+ }
+ con.commit();
+ }
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testNoFailure() {
+ testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false,
+ singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
+ }
+
+ @Test
+ public void testUpsertFailure() {
+ testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')",
+ UPSERT_TO_FAIL,
+ "upsert into a_success_table values ('testUpsertFailure2', 'b')"),
+ 1, new int[]{1}, true,
+ newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'",
+ "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"),
+ newArrayList(new Integer(2), new Integer(0)));
+ }
+
+ @Test
+ public void testUpsertSelectFailure() throws SQLException {
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
+
+ try (Connection con = driver.connect(url, new Properties())) {
+ con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
+ con.commit();
+ }
+
+ testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')",
+ UPSERT_SELECT_TO_FAIL),
+ 1, new int[]{1}, true,
+ newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')",
+ "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"),
+ newArrayList(new Integer(2), new Integer(0)));
+ }
+
+ @Test
+ public void testDeleteFailure() {
+ testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')",
+ DELETE_TO_FAIL,
+ "upsert into a_success_table values ('testDeleteFailure2', 'b')"),
+ 1, new int[]{1}, true,
+ newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'",
+ "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"),
+ newArrayList(new Integer(2), new Integer(1)));
+ }
+
+ /**
+ * {@link MutationState} keeps mutations ordered lexicographically by table name.
+ */
+ @Test
+ public void testOrderOfMutationsIsPredicatable() {
+ testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
+ UPSERT_TO_FAIL,
+ "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
+ 2, new int[]{0,1}, true,
+ newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'",
+ "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'",
+ "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"),
+ newArrayList(new Integer(0), new Integer(1), new Integer(0)));
+ }
+
+ @Test
+ public void checkThatAllStatementTypesMaintainOrderInConnection() {
+ testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')",
+ "upsert into a_success_table select k, c from c_success_table",
+ DELETE_TO_FAIL,
+ "select * from a_success_table",
+ UPSERT_TO_FAIL),
+ 2, new int[]{2,4}, true,
+ newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
+ "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
+ "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"),
+ newArrayList(new Integer(4), new Integer(0), new Integer(1)));
+ }
+
+ private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail,
+ List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) {
+ Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size());
+
+ try (Connection con = getConnectionWithTableOrderPreservingMutationState()) {
+ con.setAutoCommit(false);
+ Statement sta = con.createStatement();
+ for (String statement : statements) {
+ sta.execute(statement);
+ }
+ try {
+ con.commit();
+ if (willFail) {
+ fail("Expected at least one statement in the list to fail");
+ } else {
+ assertEquals(0, con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should have been reset to 0 in commit()
+ }
+ } catch (SQLException sqle) {
+ if (!willFail) {
+ fail("Expected no statements to fail");
+ }
+ assertEquals(CommitException.class, sqle.getClass());
+ int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
+ assertEquals(failureCount, uncommittedStatementIndexes.length);
+ assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
+ }
+
+ // verify data in HBase
+ for (int i = 0; i < countStatementsForVerification.size(); i++) {
+ String countStatement = countStatementsForVerification.get(i);
+ ResultSet rs = sta.executeQuery(countStatement);
+ if (!rs.next()) {
+ fail("Expected a single row from count query");
+ }
+ assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1));
+ }
+ } catch (SQLException e) {
+ fail(e.toString());
+ }
+ }
+
+ private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
+ Connection con = driver.connect(url, new Properties());
+ PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
+ final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
+ return new PhoenixConnection(phxCon) {
+ protected MutationState newMutationState(int maxSize) {
+ return new MutationState(maxSize, this, mutations);
+ };
+ };
+ }
+
+ public static class FailingRegionObserver extends SimpleRegionObserver {
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
+ final Durability durability) throws HBaseIOException {
+ if (shouldFailUpsert(c, put) || shouldFailDelete(c, put)) {
+ // throwing anything other than instances of IOException result
+ // in this coprocessor being unloaded
+ // DoNotRetryIOException tells HBase not to retry this mutation
+ // multiple times
+ throw new DoNotRetryIOException();
+ }
+ }
+
+ private static boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+ String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+ return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow());
+ }
+
+ private static boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+ String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+ return TABLE_NAME_TO_FAIL.equals(tableName) &&
+ // Phoenix deletes are sent as Puts with empty values
+ put.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0;
+ }
+ }
+
+ /**
+ * Used for ordering {@link MutationState#mutations} map.
+ */
+ private static class TableRefComparator implements Comparator<TableRef> {
+ @Override
+ public int compare(TableRef tr1, TableRef tr2) {
+ return tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 322d24a..6f51a4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
@@ -106,8 +107,8 @@ public class DeleteCompiler {
ConnectionQueryServices services = connection.getQueryServices();
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
- Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize);
- Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null;
+ Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+ Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
// If indexTableRef is set, we're deleting the rows from both the index table and
// the data table through a single query to save executing an additional one.
if (indexTableRef != null) {
@@ -147,11 +148,11 @@ public class DeleteCompiler {
}
table.newKey(ptr, values);
}
- mutations.put(ptr, PRow.DELETE_MARKER);
+ mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
if (indexTableRef != null) {
ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
rs.getCurrentRow().getKey(indexPtr);
- indexMutations.put(indexPtr, PRow.DELETE_MARKER);
+ indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
}
if (mutations.size() > maxSize) {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -429,9 +430,9 @@ public class DeleteCompiler {
// keys for our ranges
ScanRanges ranges = context.getScanRanges();
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
- Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+ Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
while (iterator.hasNext()) {
- mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER);
+ mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
}
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index b21cc2f..f172814 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
@@ -95,7 +96,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class UpsertCompiler {
- private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) {
+ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) {
Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
byte[][] pkValues = new byte[table.getPKColumns().size()][];
// If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -114,7 +115,7 @@ public class UpsertCompiler {
}
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
table.newKey(ptr, pkValues);
- mutation.put(ptr, columnValues);
+ mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
}
private static MutationState upsertSelect(PhoenixStatement statement,
@@ -128,7 +129,7 @@ public class UpsertCompiler {
boolean isAutoCommit = connection.getAutoCommit();
byte[][] values = new byte[columnIndexes.length][];
int rowCount = 0;
- Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+ Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
PTable table = tableRef.getTable();
ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -156,7 +157,7 @@ public class UpsertCompiler {
column.getMaxLength(), column.getScale(), column.getSortOrder());
values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
if (isAutoCommit && rowCount % batchSize == 0) {
@@ -802,8 +803,8 @@ public class UpsertCompiler {
throw new IllegalStateException();
}
}
- Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
- setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation);
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
+ setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation, statement);
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index 63bf6a1..a9d8311 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -19,23 +19,32 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
public class CommitException extends SQLException {
- private static final long serialVersionUID = 1L;
- private final MutationState uncommittedState;
- private final MutationState committedState;
+ private static final long serialVersionUID = 2L;
+ private final int[] uncommittedStatementIndexes;
- public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
+ public CommitException(Exception e, int[] uncommittedStatementIndexes) {
super(e);
- this.uncommittedState = uncommittedState;
- this.committedState = committedState;
- }
-
- public MutationState getUncommittedState() {
- return uncommittedState;
+ this.uncommittedStatementIndexes = uncommittedStatementIndexes;
}
- public MutationState getCommittedState() {
- return committedState;
+ /**
+ * Returns indexes of UPSERT and DELETE statements that have failed. Indexes returned
+ * correspond to each failed statement's order of creation within a {@link PhoenixConnection} up to
+ * commit/rollback.
+ * <p>
+ * Statements whose index is returned in this set correspond to one or more HBase mutations that have failed.
+ * <p>
+ * Statement indexes are maintained correctly for connections that mutate and query
+ * <b>data</b> (DELETE, UPSERT and SELECT) only. Statement (and their subsequent failure) order
+ * is undefined for connections that execute metadata operations due to the fact that Phoenix rolls
+ * back connections after metadata mutations.
+ *
+ * @see PhoenixConnection#getStatementExecutionCounter()
+ */
+ public int[] getUncommittedStatementIndexes() {
+ return uncommittedStatementIndexes;
}
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 04626a6..8053f15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.execute;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -61,9 +62,11 @@ import org.cloudera.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.sun.istack.NotNull;
/**
*
@@ -78,40 +81,32 @@ public class MutationState implements SQLCloseable {
private PhoenixConnection connection;
private final long maxSize;
private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
- private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+ private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
private long sizeOffset;
private int numRows = 0;
- public MutationState(int maxSize, PhoenixConnection connection) {
+ MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) {
+ this.maxSize = maxSize;
+ this.connection = connection;
+ this.mutations = mutations;
+ }
+
+ public MutationState(long maxSize, PhoenixConnection connection) {
this(maxSize,connection,0);
}
- public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
- this.maxSize = maxSize;
- this.connection = connection;
+ public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
+ this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
this.sizeOffset = sizeOffset;
}
- public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
- this.maxSize = maxSize;
- this.connection = connection;
+ public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
+ this(maxSize, connection, sizeOffset);
this.mutations.put(table, mutations);
- this.sizeOffset = sizeOffset;
this.numRows = mutations.size();
throwIfTooBig();
}
- private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
- this.maxSize = maxSize;
- this.connection = connection;
- this.sizeOffset = sizeOffset;
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
- numRows += entry.getValue().size();
- this.mutations.put(entry.getKey(), entry.getValue());
- }
- throwIfTooBig();
- }
-
private void throwIfTooBig() {
if (numRows > maxSize) {
// TODO: throw SQLException ?
@@ -134,29 +129,28 @@ public class MutationState implements SQLCloseable {
}
this.sizeOffset += newMutation.sizeOffset;
// Merge newMutation with this one, keeping state from newMutation for any overlaps
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) {
// Replace existing entries for the table with new entries
TableRef tableRef = entry.getKey();
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
- Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(tableRef, entry.getValue());
+ Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue());
if (existingRows != null) { // Rows for that table already exist
// Loop through new rows and replace existing with new
- for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+ for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
// Replace existing row with new row
- Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
- if (existingValues != null) {
+ RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+ if (existingRowMutationState != null) {
+ Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
if (existingValues != PRow.DELETE_MARKER) {
- Map<PColumn,byte[]> newRow = rowEntry.getValue();
+ Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
// if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row.
if (newRow != PRow.DELETE_MARKER) {
- // Replace existing column values with new column values
- for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) {
- existingValues.put(valueEntry.getKey(), valueEntry.getValue());
- }
+ // Merge existing column values with new column values
+ existingRowMutationState.join(rowEntry.getValue());
// Now that the existing row has been merged with the new row, replace it back
- // again (since it was replaced with the new one above).
- existingRows.put(rowEntry.getKey(), existingValues);
+ // again (since it was merged with the new one above).
+ existingRows.put(rowEntry.getKey(), existingRowMutationState);
}
}
} else {
@@ -176,16 +170,16 @@ public class MutationState implements SQLCloseable {
throwIfTooBig();
}
- private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) {
+ private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) {
final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
(tableRef.getTable().isImmutableRows() || includeMutableIndexes) ?
IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) :
Iterators.<PTable>emptyIterator();
final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
- Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
+ Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
+ Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next();
ImmutableBytesPtr key = rowEntry.getKey();
PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
List<Mutation> rowMutations, rowMutationsPertainingToIndex;
@@ -197,7 +191,7 @@ public class MutationState implements SQLCloseable {
// delete rows).
rowMutationsPertainingToIndex = Collections.emptyList();
} else {
- for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
+ for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) {
row.setValue(valueEntry.getKey(), valueEntry.getValue());
}
rowMutations = row.toRowMutations();
@@ -249,14 +243,14 @@ public class MutationState implements SQLCloseable {
}
public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
- final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+ final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
if (!iterator.hasNext()) {
return Iterators.emptyIterator();
}
Long scn = connection.getSCN();
final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
return new Iterator<Pair<byte[],List<Mutation>>>() {
- private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next();
+ private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -297,7 +291,7 @@ public class MutationState implements SQLCloseable {
Long scn = connection.getSCN();
MetaDataClient client = new MetaDataClient(connection);
long[] timeStamps = new long[this.mutations.size()];
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
TableRef tableRef = entry.getKey();
long serverTimeStamp = tableRef.getTimeStamp();
PTable table = tableRef.getTable();
@@ -312,12 +306,15 @@ public class MutationState implements SQLCloseable {
// TODO: use bitset?
table = result.getTable();
PColumn[] columns = new PColumn[table.getColumns().size()];
- for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
- Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
- if (valueEntry != PRow.DELETE_MARKER) {
- for (PColumn column : valueEntry.keySet()) {
- columns[column.getPosition()] = column;
- }
+ for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
+ RowMutationState valueEntry = rowEntry.getValue();
+ if (valueEntry != null) {
+ Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+ if (colValues != PRow.DELETE_MARKER) {
+ for (PColumn column : colValues.keySet()) {
+ columns[column.getPosition()] = column;
+ }
+ }
}
}
for (PColumn column : columns) {
@@ -357,15 +354,14 @@ public class MutationState implements SQLCloseable {
int i = 0;
byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
long[] serverTimeStamps = validate();
- Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
- List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
+ Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
// add tracing for this operation
TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
Span span = trace.getSpan();
while (iterator.hasNext()) {
- Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
- Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
+ Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
+ Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
TableRef tableRef = entry.getKey();
PTable table = tableRef.getTable();
table.getIndexMaintainers(tempPtr, connection);
@@ -425,7 +421,6 @@ public class MutationState implements SQLCloseable {
child.stop();
shouldRetry = false;
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms", connection));
- committedList.add(entry);
} catch (Exception e) {
SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
if (inferredE != null) {
@@ -446,9 +441,7 @@ public class MutationState implements SQLCloseable {
}
e = inferredE;
}
- // Throw to client with both what was committed so far and what is left to be committed.
- // That way, client can either undo what was done or try again with what was not done.
- sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
+ sqlE = new CommitException(e, getUncommittedSattementIndexes());
} finally {
try {
hTable.close();
@@ -488,7 +481,64 @@ public class MutationState implements SQLCloseable {
numRows = 0;
}
+ private int[] getUncommittedSattementIndexes() {
+ int[] result = new int[0];
+ for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
+ for (RowMutationState rowMutationState : rowMutations.values()) {
+ result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes());
+ }
+ }
+ return result;
+ }
+
@Override
public void close() throws SQLException {
}
+
+ public static int[] joinSortedIntArrays(int[] a, int[] b) {
+ int[] result = new int[a.length + b.length];
+ int i = 0, j = 0, k = 0, current;
+ while (i < a.length && j < b.length) {
+ current = a[i] < b[j] ? a[i++] : b[j++];
+ for ( ; i < a.length && a[i] == current; i++);
+ for ( ; j < b.length && b[j] == current; j++);
+ result[k++] = current;
+ }
+ while (i < a.length) {
+ for (current = a[i++] ; i < a.length && a[i] == current; i++);
+ result[k++] = current;
+ }
+ while (j < b.length) {
+ for (current = b[j++] ; j < b.length && b[j] == current; j++);
+ result[k++] = current;
+ }
+ return Arrays.copyOf(result, k);
+ }
+
+ public static class RowMutationState {
+ private Map<PColumn,byte[]> columnValues;
+ private int[] statementIndexes;
+
+ public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex) {
+ Preconditions.checkNotNull(columnValues);
+
+ this.columnValues = columnValues;
+ this.statementIndexes = new int[] {statementIndex};
+ }
+
+ Map<PColumn, byte[]> getColumnValues() {
+ return columnValues;
+ }
+
+ int[] getStatementIndexes() {
+ return statementIndexes;
+ }
+
+ void join(RowMutationState newRow) {
+ getColumnValues().putAll(newRow.getColumnValues());
+ statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
+ }
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index de9e323..c9ac94a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.call.CallRunner;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.CommitException;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.function.FunctionArgumentType;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -121,21 +122,21 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
private final Properties info;
private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
private final Map<PDataType<?>, Format> formatters = new HashMap<>();
- private final MutationState mutationState;
+ private MutationState mutationState;
private final int mutateBatchSize;
private final Long scn;
private boolean isAutoCommit = false;
private PMetaData metaData;
private final PName tenantId;
- private final String datePattern;
+ private final String datePattern;
private final String timePattern;
private final String timestampPattern;
-
+ private int statementExecutionCounter;
private boolean isClosed = false;
private Sampler<?> sampler;
private boolean readOnly = false;
- private Map<String, String> customTracingAnnotations = emptyMap();
-
+ private Map<String, String> customTracingAnnotations = emptyMap();
+
static {
Tracing.addTraceMetricsSource();
}
@@ -150,17 +151,20 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache());
this.isAutoCommit = connection.isAutoCommit;
this.sampler = connection.sampler;
+ this.statementExecutionCounter = connection.statementExecutionCounter;
}
public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException {
this(connection.getQueryServices(), connection, scn);
this.sampler = connection.sampler;
+ this.statementExecutionCounter = connection.statementExecutionCounter;
}
public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache());
this.isAutoCommit = connection.isAutoCommit;
this.sampler = connection.sampler;
+ this.statementExecutionCounter = connection.statementExecutionCounter;
}
public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
@@ -233,7 +237,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
}
});
- this.mutationState = new MutationState(maxSize, this);
+ this.mutationState = newMutationState(maxSize);
this.services.addConnection(this);
// setup tracing, if its enabled
@@ -361,6 +365,10 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
return metaData;
}
+ protected MutationState newMutationState(int maxSize) {
+ return new MutationState(maxSize, this);
+ }
+
public MutationState getMutationState() {
return mutationState;
}
@@ -426,6 +434,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
return null;
}
}, Tracing.withTracing(this, "committing mutations"));
+ statementExecutionCounter = 0;
}
@Override
@@ -626,6 +635,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
@Override
public void rollback() throws SQLException {
mutationState.rollback(this);
+ statementExecutionCounter = 0;
}
@Override
@@ -776,4 +786,19 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
public KeyValueBuilder getKeyValueBuilder() {
return this.services.getKeyValueBuilder();
}
+
+ /**
+ * Used to track executions of {@link Statement}s and {@link PreparedStatement}s that were created from this connection before
+ * commit or rollback. 0-based. Used to associate partial save errors with SQL statements
+ * invoked by users.
+ * @see CommitException
+ * @see #incrementStatementExecutionCounter()
+ */
+ public int getStatementExecutionCounter() {
+ return statementExecutionCounter;
+ }
+
+ public void incrementStatementExecutionCounter() {
+ statementExecutionCounter++;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 25be8c0..a23484c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -54,8 +54,8 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
-import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.SQLCloseable;
@@ -79,8 +79,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
private final String query;
- public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException,
- IOException {
+ public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, IOException {
super(connection);
this.statement = parser.nextStatement(new ExecutableNodeFactory());
if (this.statement == null) { throw new EOFException(); }
@@ -89,7 +88,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
this.parameters = Arrays.asList(new Object[statement.getBindCount()]);
Collections.fill(parameters, BindManager.UNBOUND_PARAMETER);
}
-
+
public PhoenixPreparedStatement(PhoenixConnection connection, String query) throws SQLException {
super(connection);
this.query = query;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 4ca5bb5..c6d086a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -151,6 +151,7 @@ import com.google.common.collect.Lists;
* @since 0.1
*/
public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
+
private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class);
public enum Operation {
@@ -243,6 +244,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
setLastResultSet(rs);
setLastUpdateCount(NO_UPDATE);
setLastUpdateOperation(stmt.getOperation());
+ connection.incrementStatementExecutionCounter();
return rs;
} catch (RuntimeException e) {
// FIXME: Expression.evaluate does not throw SQLException
@@ -289,6 +291,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount());
setLastUpdateCount(lastUpdateCount);
setLastUpdateOperation(stmt.getOperation());
+ connection.incrementStatementExecutionCounter();
return lastUpdateCount;
} catch (RuntimeException e) {
// FIXME: Expression.evaluate does not throw SQLException
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
new file mode 100644
index 0000000..67c3353
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class MutationStateTest {
+
+ @Test
+ public void testJoinIntArrays() {
+ // simple case
+ int[] a = new int[] {1};
+ int[] b = new int[] {2};
+ int[] result = joinSortedIntArrays(a, b);
+
+ assertEquals(2, result.length);
+ assertArrayEquals(new int[] {1,2}, result);
+
+ // empty arrays
+ a = new int[0];
+ b = new int[0];
+ result = joinSortedIntArrays(a, b);
+
+ assertEquals(0, result.length);
+ assertArrayEquals(new int[] {}, result);
+
+ // dupes between arrays
+ a = new int[] {1,2,3};
+ b = new int[] {1,2,4};
+ result = joinSortedIntArrays(a, b);
+
+ assertEquals(4, result.length);
+ assertArrayEquals(new int[] {1,2,3,4}, result);
+
+ // dupes within arrays
+ a = new int[] {1,2,2,3};
+ b = new int[] {1,2,4};
+ result = joinSortedIntArrays(a, b);
+
+ assertEquals(4, result.length);
+ assertArrayEquals(new int[] {1,2,3,4}, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa58c782/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 9947440..b64eff8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -655,7 +655,7 @@ public abstract class BaseTest {
* Create a {@link PhoenixTestDriver} and register it.
* @return an initialized and registered {@link PhoenixTestDriver}
*/
- protected static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
+ public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
DriverManager.registerDriver(newDriver);
Driver oldDriver = DriverManager.getDriver(url);