You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/04 23:41:15 UTC
[40/50] [abbrv] phoenix git commit: Revert "Surface partial saves in
CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37"
Revert "Surface partial saves in CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37"
This reverts commit fa58c7821a2e8fce30a8c0ff6e42aa00134dbce0.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/569469a4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/569469a4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/569469a4
Branch: refs/heads/calcite
Commit: 569469a46bae57cc4d6cbbcd7e01d535560f07e2
Parents: fa58c78
Author: Eli Levine <el...@apache.org>
Authored: Fri Feb 27 11:15:28 2015 -0800
Committer: Eli Levine <el...@apache.org>
Committed: Fri Feb 27 11:15:28 2015 -0800
----------------------------------------------------------------------
.../apache/phoenix/execute/PartialCommitIT.java | 302 -------------------
.../apache/phoenix/compile/DeleteCompiler.java | 13 +-
.../apache/phoenix/compile/UpsertCompiler.java | 13 +-
.../apache/phoenix/execute/CommitException.java | 35 +--
.../apache/phoenix/execute/MutationState.java | 156 ++++------
.../apache/phoenix/jdbc/PhoenixConnection.java | 37 +--
.../phoenix/jdbc/PhoenixPreparedStatement.java | 7 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 3 -
.../phoenix/execute/MutationStateTest.java | 64 ----
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
10 files changed, 89 insertions(+), 543 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
deleted file mode 100644
index 550d7de..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Copyright 2014 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Sets.newHashSet;
-import static java.util.Collections.singletonList;
-import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class PartialCommitIT {
-
- private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase();
- private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
- private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
- private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
- private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + " where k='z'";
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static String url;
- private static Driver driver;
- private static final Properties props = new Properties();
-
- static {
- props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
- }
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- setUpConfigForMiniCluster(conf);
- conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class);
- conf.setBoolean("hbase.coprocessor.abortonerror", false);
- conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
- TEST_UTIL.startMiniCluster();
- String clientPort = TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
- url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
- + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-
- Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
- // Must update config before starting server
- props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
- driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
- createTablesWithABitOfData();
- }
-
- private static void createTablesWithABitOfData() throws Exception {
- Properties props = new Properties();
- props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
-
- try (Connection con = driver.connect(url, new Properties())) {
- Statement sta = con.createStatement();
- sta.execute("create table a_success_table (k varchar primary key, c varchar)");
- sta.execute("create table b_failure_table (k varchar primary key, c varchar)");
- sta.execute("create table c_success_table (k varchar primary key, c varchar)");
- con.commit();
- }
-
- props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
- try (Connection con = driver.connect(url, new Properties())) {
- con.setAutoCommit(false);
- Statement sta = con.createStatement();
- for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) {
- sta.execute("upsert into " + table + " values ('z', 'z')");
- sta.execute("upsert into " + table + " values ('zz', 'zz')");
- sta.execute("upsert into " + table + " values ('zzz', 'zzz')");
- }
- con.commit();
- }
- }
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @Test
- public void testNoFailure() {
- testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false,
- singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
- }
-
- @Test
- public void testUpsertFailure() {
- testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')",
- UPSERT_TO_FAIL,
- "upsert into a_success_table values ('testUpsertFailure2', 'b')"),
- 1, new int[]{1}, true,
- newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'",
- "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"),
- newArrayList(new Integer(2), new Integer(0)));
- }
-
- @Test
- public void testUpsertSelectFailure() throws SQLException {
- props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
- try (Connection con = driver.connect(url, new Properties())) {
- con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
- con.commit();
- }
-
- testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')",
- UPSERT_SELECT_TO_FAIL),
- 1, new int[]{1}, true,
- newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')",
- "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"),
- newArrayList(new Integer(2), new Integer(0)));
- }
-
- @Test
- public void testDeleteFailure() {
- testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')",
- DELETE_TO_FAIL,
- "upsert into a_success_table values ('testDeleteFailure2', 'b')"),
- 1, new int[]{1}, true,
- newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'",
- "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"),
- newArrayList(new Integer(2), new Integer(1)));
- }
-
- /**
- * {@link MutationState} keeps mutations ordered lexicographically by table name.
- */
- @Test
- public void testOrderOfMutationsIsPredicatable() {
- testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
- UPSERT_TO_FAIL,
- "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
- 2, new int[]{0,1}, true,
- newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'",
- "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'",
- "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"),
- newArrayList(new Integer(0), new Integer(1), new Integer(0)));
- }
-
- @Test
- public void checkThatAllStatementTypesMaintainOrderInConnection() {
- testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')",
- "upsert into a_success_table select k, c from c_success_table",
- DELETE_TO_FAIL,
- "select * from a_success_table",
- UPSERT_TO_FAIL),
- 2, new int[]{2,4}, true,
- newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
- "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
- "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"),
- newArrayList(new Integer(4), new Integer(0), new Integer(1)));
- }
-
- private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail,
- List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) {
- Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size());
-
- try (Connection con = getConnectionWithTableOrderPreservingMutationState()) {
- con.setAutoCommit(false);
- Statement sta = con.createStatement();
- for (String statement : statements) {
- sta.execute(statement);
- }
- try {
- con.commit();
- if (willFail) {
- fail("Expected at least one statement in the list to fail");
- } else {
- assertEquals(0, con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should have been reset to 0 in commit()
- }
- } catch (SQLException sqle) {
- if (!willFail) {
- fail("Expected no statements to fail");
- }
- assertEquals(CommitException.class, sqle.getClass());
- int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
- assertEquals(failureCount, uncommittedStatementIndexes.length);
- assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
- }
-
- // verify data in HBase
- for (int i = 0; i < countStatementsForVerification.size(); i++) {
- String countStatement = countStatementsForVerification.get(i);
- ResultSet rs = sta.executeQuery(countStatement);
- if (!rs.next()) {
- fail("Expected a single row from count query");
- }
- assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1));
- }
- } catch (SQLException e) {
- fail(e.toString());
- }
- }
-
- private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
- Connection con = driver.connect(url, new Properties());
- PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
- final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
- return new PhoenixConnection(phxCon) {
- protected MutationState newMutationState(int maxSize) {
- return new MutationState(maxSize, this, mutations);
- };
- };
- }
-
- public static class FailingRegionObserver extends SimpleRegionObserver {
- @Override
- public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
- final Durability durability) throws HBaseIOException {
- if (shouldFailUpsert(c, put) || shouldFailDelete(c, put)) {
- // throwing anything other than instances of IOException result
- // in this coprocessor being unloaded
- // DoNotRetryIOException tells HBase not to retry this mutation
- // multiple times
- throw new DoNotRetryIOException();
- }
- }
-
- private static boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
- String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
- return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow());
- }
-
- private static boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
- String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
- return TABLE_NAME_TO_FAIL.equals(tableName) &&
- // Phoenix deletes are sent as Puts with empty values
- put.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0;
- }
- }
-
- /**
- * Used for ordering {@link MutationState#mutations} map.
- */
- private static class TableRefComparator implements Comparator<TableRef> {
- @Override
- public int compare(TableRef tr1, TableRef tr2) {
- return tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 6f51a4c..322d24a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -39,7 +39,6 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
@@ -107,8 +106,8 @@ public class DeleteCompiler {
ConnectionQueryServices services = connection.getQueryServices();
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
- Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
- Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null;
// If indexTableRef is set, we're deleting the rows from both the index table and
// the data table through a single query to save executing an additional one.
if (indexTableRef != null) {
@@ -148,11 +147,11 @@ public class DeleteCompiler {
}
table.newKey(ptr, values);
}
- mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+ mutations.put(ptr, PRow.DELETE_MARKER);
if (indexTableRef != null) {
ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
rs.getCurrentRow().getKey(indexPtr);
- indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+ indexMutations.put(indexPtr, PRow.DELETE_MARKER);
}
if (mutations.size() > maxSize) {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -430,9 +429,9 @@ public class DeleteCompiler {
// keys for our ranges
ScanRanges ranges = context.getScanRanges();
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
- Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
while (iterator.hasNext()) {
- mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+ mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER);
}
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index f172814..b21cc2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -42,7 +42,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
@@ -96,7 +95,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class UpsertCompiler {
- private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) {
+ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) {
Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
byte[][] pkValues = new byte[table.getPKColumns().size()][];
// If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -115,7 +114,7 @@ public class UpsertCompiler {
}
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
table.newKey(ptr, pkValues);
- mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
+ mutation.put(ptr, columnValues);
}
private static MutationState upsertSelect(PhoenixStatement statement,
@@ -129,7 +128,7 @@ public class UpsertCompiler {
boolean isAutoCommit = connection.getAutoCommit();
byte[][] values = new byte[columnIndexes.length][];
int rowCount = 0;
- Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize);
PTable table = tableRef.getTable();
ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -157,7 +156,7 @@ public class UpsertCompiler {
column.getMaxLength(), column.getScale(), column.getSortOrder());
values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation);
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
if (isAutoCommit && rowCount % batchSize == 0) {
@@ -803,8 +802,8 @@ public class UpsertCompiler {
throw new IllegalStateException();
}
}
- Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
- setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation, statement);
+ Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
+ setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation);
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index a9d8311..63bf6a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -19,32 +19,23 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-
public class CommitException extends SQLException {
- private static final long serialVersionUID = 2L;
- private final int[] uncommittedStatementIndexes;
+ private static final long serialVersionUID = 1L;
+ private final MutationState uncommittedState;
+ private final MutationState committedState;
- public CommitException(Exception e, int[] uncommittedStatementIndexes) {
+ public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
super(e);
- this.uncommittedStatementIndexes = uncommittedStatementIndexes;
+ this.uncommittedState = uncommittedState;
+ this.committedState = committedState;
+ }
+
+ public MutationState getUncommittedState() {
+ return uncommittedState;
}
- /**
- * Returns indexes of UPSERT and DELETE statements that have failed. Indexes returned
- * correspond to each failed statement's order of creation within a {@link PhoenixConnection} up to
- * commit/rollback.
- * <p>
- * Statements whose index is returned in this set correspond to one or more HBase mutations that have failed.
- * <p>
- * Statement indexes are maintained correctly for connections that mutate and query
- * <b>data</b> (DELETE, UPSERT and SELECT) only. Statement (and their subsequent failure) order
- * is undefined for connections that execute metadata operations due to the fact that Phoenix rolls
- * back connections after metadata mutations.
- *
- * @see PhoenixConnection#getStatementExecutionCounter()
- */
- public int[] getUncommittedStatementIndexes() {
- return uncommittedStatementIndexes;
+ public MutationState getCommittedState() {
+ return committedState;
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8053f15..04626a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.execute;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -62,11 +61,9 @@ import org.cloudera.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.sun.istack.NotNull;
/**
*
@@ -81,32 +78,40 @@ public class MutationState implements SQLCloseable {
private PhoenixConnection connection;
private final long maxSize;
private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
- private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
+ private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
private long sizeOffset;
private int numRows = 0;
- MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) {
- this.maxSize = maxSize;
- this.connection = connection;
- this.mutations = mutations;
- }
-
- public MutationState(long maxSize, PhoenixConnection connection) {
+ public MutationState(int maxSize, PhoenixConnection connection) {
this(maxSize,connection,0);
}
- public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
- this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
+ public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
+ this.maxSize = maxSize;
+ this.connection = connection;
this.sizeOffset = sizeOffset;
}
- public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
- this(maxSize, connection, sizeOffset);
+ public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
+ this.maxSize = maxSize;
+ this.connection = connection;
this.mutations.put(table, mutations);
+ this.sizeOffset = sizeOffset;
this.numRows = mutations.size();
throwIfTooBig();
}
+ private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
+ this.maxSize = maxSize;
+ this.connection = connection;
+ this.sizeOffset = sizeOffset;
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
+ numRows += entry.getValue().size();
+ this.mutations.put(entry.getKey(), entry.getValue());
+ }
+ throwIfTooBig();
+ }
+
private void throwIfTooBig() {
if (numRows > maxSize) {
// TODO: throw SQLException ?
@@ -129,28 +134,29 @@ public class MutationState implements SQLCloseable {
}
this.sizeOffset += newMutation.sizeOffset;
// Merge newMutation with this one, keeping state from newMutation for any overlaps
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) {
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
// Replace existing entries for the table with new entries
TableRef tableRef = entry.getKey();
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
- Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue());
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(tableRef, entry.getValue());
if (existingRows != null) { // Rows for that table already exist
// Loop through new rows and replace existing with new
- for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
+ for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
// Replace existing row with new row
- RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
- if (existingRowMutationState != null) {
- Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
+ Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+ if (existingValues != null) {
if (existingValues != PRow.DELETE_MARKER) {
- Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
+ Map<PColumn,byte[]> newRow = rowEntry.getValue();
// if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row.
if (newRow != PRow.DELETE_MARKER) {
- // Merge existing column values with new column values
- existingRowMutationState.join(rowEntry.getValue());
+ // Replace existing column values with new column values
+ for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) {
+ existingValues.put(valueEntry.getKey(), valueEntry.getValue());
+ }
// Now that the existing row has been merged with the new row, replace it back
- // again (since it was merged with the new one above).
- existingRows.put(rowEntry.getKey(), existingRowMutationState);
+ // again (since it was replaced with the new one above).
+ existingRows.put(rowEntry.getKey(), existingValues);
}
}
} else {
@@ -170,16 +176,16 @@ public class MutationState implements SQLCloseable {
throwIfTooBig();
}
- private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) {
+ private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) {
final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
(tableRef.getTable().isImmutableRows() || includeMutableIndexes) ?
IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) :
Iterators.<PTable>emptyIterator();
final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
- Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator();
+ Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next();
+ Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
ImmutableBytesPtr key = rowEntry.getKey();
PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
List<Mutation> rowMutations, rowMutationsPertainingToIndex;
@@ -191,7 +197,7 @@ public class MutationState implements SQLCloseable {
// delete rows).
rowMutationsPertainingToIndex = Collections.emptyList();
} else {
- for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) {
+ for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
row.setValue(valueEntry.getKey(), valueEntry.getValue());
}
rowMutations = row.toRowMutations();
@@ -243,14 +249,14 @@ public class MutationState implements SQLCloseable {
}
public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
- final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
+ final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
if (!iterator.hasNext()) {
return Iterators.emptyIterator();
}
Long scn = connection.getSCN();
final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
return new Iterator<Pair<byte[],List<Mutation>>>() {
- private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
+ private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next();
private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -291,7 +297,7 @@ public class MutationState implements SQLCloseable {
Long scn = connection.getSCN();
MetaDataClient client = new MetaDataClient(connection);
long[] timeStamps = new long[this.mutations.size()];
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
TableRef tableRef = entry.getKey();
long serverTimeStamp = tableRef.getTimeStamp();
PTable table = tableRef.getTable();
@@ -306,15 +312,12 @@ public class MutationState implements SQLCloseable {
// TODO: use bitset?
table = result.getTable();
PColumn[] columns = new PColumn[table.getColumns().size()];
- for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
- RowMutationState valueEntry = rowEntry.getValue();
- if (valueEntry != null) {
- Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
- if (colValues != PRow.DELETE_MARKER) {
- for (PColumn column : colValues.keySet()) {
- columns[column.getPosition()] = column;
- }
- }
+ for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+ Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
+ if (valueEntry != PRow.DELETE_MARKER) {
+ for (PColumn column : valueEntry.keySet()) {
+ columns[column.getPosition()] = column;
+ }
}
}
for (PColumn column : columns) {
@@ -354,14 +357,15 @@ public class MutationState implements SQLCloseable {
int i = 0;
byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
long[] serverTimeStamps = validate();
- Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
+ Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+ List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
// add tracing for this operation
TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
Span span = trace.getSpan();
while (iterator.hasNext()) {
- Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
- Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
+ Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
+ Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
TableRef tableRef = entry.getKey();
PTable table = tableRef.getTable();
table.getIndexMaintainers(tempPtr, connection);
@@ -421,6 +425,7 @@ public class MutationState implements SQLCloseable {
child.stop();
shouldRetry = false;
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms", connection));
+ committedList.add(entry);
} catch (Exception e) {
SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
if (inferredE != null) {
@@ -441,7 +446,9 @@ public class MutationState implements SQLCloseable {
}
e = inferredE;
}
- sqlE = new CommitException(e, getUncommittedSattementIndexes());
+ // Throw to client with both what was committed so far and what is left to be committed.
+ // That way, client can either undo what was done or try again with what was not done.
+ sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
} finally {
try {
hTable.close();
@@ -481,64 +488,7 @@ public class MutationState implements SQLCloseable {
numRows = 0;
}
- private int[] getUncommittedSattementIndexes() {
- int[] result = new int[0];
- for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
- for (RowMutationState rowMutationState : rowMutations.values()) {
- result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes());
- }
- }
- return result;
- }
-
@Override
public void close() throws SQLException {
}
-
- public static int[] joinSortedIntArrays(int[] a, int[] b) {
- int[] result = new int[a.length + b.length];
- int i = 0, j = 0, k = 0, current;
- while (i < a.length && j < b.length) {
- current = a[i] < b[j] ? a[i++] : b[j++];
- for ( ; i < a.length && a[i] == current; i++);
- for ( ; j < b.length && b[j] == current; j++);
- result[k++] = current;
- }
- while (i < a.length) {
- for (current = a[i++] ; i < a.length && a[i] == current; i++);
- result[k++] = current;
- }
- while (j < b.length) {
- for (current = b[j++] ; j < b.length && b[j] == current; j++);
- result[k++] = current;
- }
- return Arrays.copyOf(result, k);
- }
-
- public static class RowMutationState {
- private Map<PColumn,byte[]> columnValues;
- private int[] statementIndexes;
-
- public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex) {
- Preconditions.checkNotNull(columnValues);
-
- this.columnValues = columnValues;
- this.statementIndexes = new int[] {statementIndex};
- }
-
- Map<PColumn, byte[]> getColumnValues() {
- return columnValues;
- }
-
- int[] getStatementIndexes() {
- return statementIndexes;
- }
-
- void join(RowMutationState newRow) {
- getColumnValues().putAll(newRow.getColumnValues());
- statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
- }
-
-
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index c9ac94a..de9e323 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.call.CallRunner;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.CommitException;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.function.FunctionArgumentType;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -122,21 +121,21 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
private final Properties info;
private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
private final Map<PDataType<?>, Format> formatters = new HashMap<>();
- private MutationState mutationState;
+ private final MutationState mutationState;
private final int mutateBatchSize;
private final Long scn;
private boolean isAutoCommit = false;
private PMetaData metaData;
private final PName tenantId;
- private final String datePattern;
+ private final String datePattern;
private final String timePattern;
private final String timestampPattern;
- private int statementExecutionCounter;
+
private boolean isClosed = false;
private Sampler<?> sampler;
private boolean readOnly = false;
- private Map<String, String> customTracingAnnotations = emptyMap();
-
+ private Map<String, String> customTracingAnnotations = emptyMap();
+
static {
Tracing.addTraceMetricsSource();
}
@@ -151,20 +150,17 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache());
this.isAutoCommit = connection.isAutoCommit;
this.sampler = connection.sampler;
- this.statementExecutionCounter = connection.statementExecutionCounter;
}
public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException {
this(connection.getQueryServices(), connection, scn);
this.sampler = connection.sampler;
- this.statementExecutionCounter = connection.statementExecutionCounter;
}
public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache());
this.isAutoCommit = connection.isAutoCommit;
this.sampler = connection.sampler;
- this.statementExecutionCounter = connection.statementExecutionCounter;
}
public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
@@ -237,7 +233,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
}
});
- this.mutationState = newMutationState(maxSize);
+ this.mutationState = new MutationState(maxSize, this);
this.services.addConnection(this);
// setup tracing, if its enabled
@@ -365,10 +361,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
return metaData;
}
- protected MutationState newMutationState(int maxSize) {
- return new MutationState(maxSize, this);
- }
-
public MutationState getMutationState() {
return mutationState;
}
@@ -434,7 +426,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
return null;
}
}, Tracing.withTracing(this, "committing mutations"));
- statementExecutionCounter = 0;
}
@Override
@@ -635,7 +626,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
@Override
public void rollback() throws SQLException {
mutationState.rollback(this);
- statementExecutionCounter = 0;
}
@Override
@@ -786,19 +776,4 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
public KeyValueBuilder getKeyValueBuilder() {
return this.services.getKeyValueBuilder();
}
-
- /**
- * Used to track executions of {@link Statement}s and {@link PreparedStatement}s that were created from this connection before
- * commit or rollback. 0-based. Used to associate partial save errors with SQL statements
- * invoked by users.
- * @see CommitException
- * @see #incrementStatementExecutionCounter()
- */
- public int getStatementExecutionCounter() {
- return statementExecutionCounter;
- }
-
- public void incrementStatementExecutionCounter() {
- statementExecutionCounter++;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index a23484c..25be8c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -54,8 +54,8 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
-import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.SQLCloseable;
@@ -79,7 +79,8 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
private final String query;
- public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, IOException {
+ public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException,
+ IOException {
super(connection);
this.statement = parser.nextStatement(new ExecutableNodeFactory());
if (this.statement == null) { throw new EOFException(); }
@@ -88,7 +89,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
this.parameters = Arrays.asList(new Object[statement.getBindCount()]);
Collections.fill(parameters, BindManager.UNBOUND_PARAMETER);
}
-
+
public PhoenixPreparedStatement(PhoenixConnection connection, String query) throws SQLException {
super(connection);
this.query = query;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index c6d086a..4ca5bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -151,7 +151,6 @@ import com.google.common.collect.Lists;
* @since 0.1
*/
public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
-
private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class);
public enum Operation {
@@ -244,7 +243,6 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
setLastResultSet(rs);
setLastUpdateCount(NO_UPDATE);
setLastUpdateOperation(stmt.getOperation());
- connection.incrementStatementExecutionCounter();
return rs;
} catch (RuntimeException e) {
// FIXME: Expression.evaluate does not throw SQLException
@@ -291,7 +289,6 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount());
setLastUpdateCount(lastUpdateCount);
setLastUpdateOperation(stmt.getOperation());
- connection.incrementStatementExecutionCounter();
return lastUpdateCount;
} catch (RuntimeException e) {
// FIXME: Expression.evaluate does not throw SQLException
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
deleted file mode 100644
index 67c3353..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class MutationStateTest {
-
- @Test
- public void testJoinIntArrays() {
- // simple case
- int[] a = new int[] {1};
- int[] b = new int[] {2};
- int[] result = joinSortedIntArrays(a, b);
-
- assertEquals(2, result.length);
- assertArrayEquals(new int[] {1,2}, result);
-
- // empty arrays
- a = new int[0];
- b = new int[0];
- result = joinSortedIntArrays(a, b);
-
- assertEquals(0, result.length);
- assertArrayEquals(new int[] {}, result);
-
- // dupes between arrays
- a = new int[] {1,2,3};
- b = new int[] {1,2,4};
- result = joinSortedIntArrays(a, b);
-
- assertEquals(4, result.length);
- assertArrayEquals(new int[] {1,2,3,4}, result);
-
- // dupes within arrays
- a = new int[] {1,2,2,3};
- b = new int[] {1,2,4};
- result = joinSortedIntArrays(a, b);
-
- assertEquals(4, result.length);
- assertArrayEquals(new int[] {1,2,3,4}, result);
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b64eff8..9947440 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -655,7 +655,7 @@ public abstract class BaseTest {
* Create a {@link PhoenixTestDriver} and register it.
* @return an initialized and registered {@link PhoenixTestDriver}
*/
- public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
+ protected static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
DriverManager.registerDriver(newDriver);
Driver oldDriver = DriverManager.getDriver(url);