You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/12/31 02:53:26 UTC
[1/7] phoenix git commit: PHOENIX-2545 Abort transaction if send
fails during commit
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 0f7dca652 -> 8b3ca756c
PHOENIX-2545 Abort transaction if send fails during commit
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7e4d39e6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7e4d39e6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7e4d39e6
Branch: refs/heads/4.x-HBase-0.98
Commit: 7e4d39e637d1ba765abed9e1d6ad73295575d2b9
Parents: 0f7dca6
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Dec 26 23:47:31 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:38:08 2015 -0800
----------------------------------------------------------------------
.../apache/phoenix/end2end/CreateTableIT.java | 28 +++
.../apache/phoenix/end2end/UpsertSelectIT.java | 2 +-
.../apache/phoenix/execute/PartialCommitIT.java | 44 ++--
.../coprocessor/MetaDataEndpointImpl.java | 13 +-
.../apache/phoenix/execute/MutationState.java | 216 ++++++++++++-------
.../index/PhoenixTransactionalIndexer.java | 20 +-
6 files changed, 207 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 7c4576c..5ffc354 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.Test;
@@ -407,4 +408,31 @@ public class CreateTableIT extends BaseClientManagedTimeIT {
assertEquals(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_TTL.getErrorCode(),sqle.getErrorCode());
}
}
+
+ @Test
+ public void testAlterDeletedTable() throws Exception {
+ String ddl = "create table T ("
+ + " K varchar primary key,"
+ + " V1 varchar)";
+ long ts = nextTimestamp();
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.createStatement().execute(ddl);
+ conn.close();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50));
+ Connection connAt50 = DriverManager.getConnection(getUrl(), props);
+ connAt50.createStatement().execute("DROP TABLE T");
+ connAt50.close();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+20));
+ Connection connAt20 = DriverManager.getConnection(getUrl(), props);
+ connAt20.createStatement().execute("UPDATE STATISTICS T"); // Invalidates from cache
+ try {
+ connAt20.createStatement().execute("ALTER TABLE T ADD V2 VARCHAR");
+ fail();
+ } catch (NewerTableAlreadyExistsException e) {
+
+ }
+ connAt20.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 689562a..b5252e0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1109,7 +1109,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
}
// upsert data into base table without specifying the row timestamp column PK2
- long upsertedTs = 5;
+ long upsertedTs = nextTimestamp();
try (Connection conn = getConnection(upsertedTs)) {
// Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
// creating a new row whose timestamp is the SCN of the connection. The same SCN will be used
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 61aae62..8d7ebcb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -100,7 +100,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
return Arrays.asList(false, true);
}
+ private final boolean transactional;
+
public PartialCommitIT(boolean transactional) {
+ this.transactional = transactional;
if (transactional) {
A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN";
B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN";
@@ -148,8 +151,8 @@ public class PartialCommitIT extends BaseOwnClusterIT {
@Test
public void testNoFailure() {
- testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), 0, new int[0], false,
- singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"), singletonList(new Integer(1)));
+ testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"),
+ singletonList(new Integer(1)));
}
@Test
@@ -157,10 +160,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure1', 'a')",
UPSERT_TO_FAIL,
"upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure2', 'b')"),
- 1, new int[]{1}, true,
+ transactional ? new int[] {0,1,2} : new int[]{1}, true,
newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testUpsertFailure_'",
- "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
- newArrayList(new Integer(2), new Integer(0)));
+ "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+ transactional ? newArrayList(new Integer(0), new Integer(0)) : newArrayList(new Integer(2), new Integer(0)));
}
@Test
@@ -172,10 +175,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertSelectFailure', 'a')",
UPSERT_SELECT_TO_FAIL),
- 1, new int[]{1}, true,
+ transactional ? new int[] {0,1} : new int[]{1}, true,
newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')",
- "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
- newArrayList(new Integer(2), new Integer(0)));
+ "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+ transactional ? newArrayList(new Integer(1) /* from commit above */, new Integer(0)) : newArrayList(new Integer(2), new Integer(0)));
}
@Test
@@ -183,10 +186,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure1', 'a')",
DELETE_TO_FAIL,
"upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure2', 'b')"),
- 1, new int[]{1}, true,
+ transactional ? new int[] {0,1,2} : new int[]{1}, true,
newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testDeleteFailure_'",
- "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
- newArrayList(new Integer(2), new Integer(1)));
+ "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
+ transactional ? newArrayList(new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(2), new Integer(1)));
}
/**
@@ -197,11 +200,11 @@ public class PartialCommitIT extends BaseOwnClusterIT {
testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
UPSERT_TO_FAIL,
"upsert into " + A_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
- 2, new int[]{0,1}, true,
+ transactional ? new int[] {0,1,2} : new int[]{0,1}, true,
newArrayList("select count(*) from " + C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
"select count(*) from " + A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
- "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
- newArrayList(new Integer(0), new Integer(1), new Integer(0)));
+ "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+ transactional ? newArrayList(new Integer(0), new Integer(0), new Integer(0)) : newArrayList(new Integer(0), new Integer(1), new Integer(0)));
}
@Test
@@ -211,15 +214,15 @@ public class PartialCommitIT extends BaseOwnClusterIT {
DELETE_TO_FAIL,
"select * from " + A_SUCESS_TABLE + "",
UPSERT_TO_FAIL),
- 2, new int[]{2,4}, true,
+ transactional ? new int[] {0,1,2,4} : new int[]{2,4}, true,
newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
"select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'",
- "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
- newArrayList(new Integer(4), new Integer(0), new Integer(1)));
+ "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
+ transactional ? newArrayList(new Integer(3) /* original rows */, new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(4), new Integer(0), new Integer(1)));
}
- private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail,
- List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) {
+ private void testPartialCommit(List<String> statements, int[] expectedUncommittedStatementIndexes, boolean willFail, List<String> countStatementsForVerification,
+ List<Integer> expectedCountsForVerification) {
Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size());
try (Connection con = getConnectionWithTableOrderPreservingMutationState()) {
@@ -241,7 +244,6 @@ public class PartialCommitIT extends BaseOwnClusterIT {
}
assertEquals(CommitException.class, sqle.getClass());
int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
- assertEquals(failureCount, uncommittedStatementIndexes.length);
assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
}
@@ -267,7 +269,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
return new PhoenixConnection(phxCon, null) {
@Override
protected MutationState newMutationState(int maxSize) {
- return new MutationState(maxSize, this, mutations, null);
+ return new MutationState(maxSize, this, mutations, null, null);
};
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 1280baa..2e931fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -977,13 +977,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
List<Cell> results = Lists.<Cell> newArrayList();
- try (RegionScanner scanner = region.getScanner(scan);) {
+ try (RegionScanner scanner = region.getScanner(scan)) {
scanner.next(results);
}
- // HBase ignores the time range on a raw scan (HBASE-7362)
- if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
- Cell kv = results.get(0);
- if (kv.getTypeByte() == Type.Delete.getCode()) {
+ for (Cell kv : results) {
+ KeyValue.Type type = Type.codeToType(kv.getTypeByte());
+ if (type == Type.DeleteFamily) { // Row was deleted
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = newDeletedTableMarker(kv.getTimestamp());
@@ -1621,7 +1620,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
&& (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
// if not found then call newerTableExists and add delete marker for timestamp
// found
- if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
+ table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
+ if (table != null) {
+ logger.info("Found newer table deleted as of " + table.getTimeStamp());
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 195dce3..eb2c77b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -68,6 +69,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.types.PLong;
@@ -109,8 +111,9 @@ import co.cask.tephra.hbase98.TransactionAwareHTable;
public class MutationState implements SQLCloseable {
private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
private static final TransactionCodec CODEC = new TransactionCodec();
+ private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
- private PhoenixConnection connection;
+ private final PhoenixConnection connection;
private final long maxSize;
private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
private final List<TransactionAware> txAwares;
@@ -120,35 +123,39 @@ public class MutationState implements SQLCloseable {
private Transaction tx;
private long sizeOffset;
private int numRows = 0;
- private boolean txStarted = false;
+ private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private final MutationMetricQueue mutationMetricQueue;
private ReadMetricQueue readMetricQueue;
public MutationState(long maxSize, PhoenixConnection connection) {
- this(maxSize,connection, null);
+ this(maxSize,connection, null, null);
+ }
+
+ public MutationState(long maxSize, PhoenixConnection connection, TransactionContext txContext) {
+ this(maxSize,connection, null, txContext);
}
public MutationState(MutationState mutationState) {
- this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction());
+ this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction(), null);
}
public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
- this(maxSize, connection, null, sizeOffset);
+ this(maxSize, connection, null, null, sizeOffset);
}
- private MutationState(long maxSize, PhoenixConnection connection, Transaction tx) {
- this(maxSize,connection, tx, 0);
+ private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext) {
+ this(maxSize,connection, tx, txContext, 0);
}
- private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) {
- this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx);
+ private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) {
+ this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx, txContext);
this.sizeOffset = sizeOffset;
}
MutationState(long maxSize, PhoenixConnection connection,
Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
- Transaction tx) {
+ Transaction tx, TransactionContext txContext) {
this.maxSize = maxSize;
this.connection = connection;
this.mutations = mutations;
@@ -157,26 +164,34 @@ public class MutationState implements SQLCloseable {
: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
this.tx = tx;
if (tx == null) {
- this.txAwares = Collections.emptyList();
- TransactionSystemClient txServiceClient = this.connection
- .getQueryServices().getTransactionSystemClient();
- this.txContext = new TransactionContext(txServiceClient);
+ this.txAwares = Collections.emptyList();
+ if (txContext == null) {
+ TransactionSystemClient txServiceClient = this.connection
+ .getQueryServices().getTransactionSystemClient();
+ this.txContext = new TransactionContext(txServiceClient);
+ } else {
+ this.txContext = txContext;
+ }
} else {
// this code path is only used while running child scans, we can't pass the txContext to child scans
// as it is not thread safe, so we use the tx member variable
- txAwares = Lists.newArrayList();
- txContext = null;
+ this.txAwares = Lists.newArrayList();
+ this.txContext = null;
}
}
public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
- this(maxSize, connection, null, sizeOffset);
+ this(maxSize, connection, null, null, sizeOffset);
this.mutations.put(table, mutations);
this.numRows = mutations.size();
this.tx = connection.getMutationState().getTransaction();
throwIfTooBig();
}
+ public long getMaxSize() {
+ return maxSize;
+ }
+
public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException {
Transaction currentTx = getTransaction();
if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) {
@@ -308,9 +323,8 @@ public class MutationState implements SQLCloseable {
}
try {
- if (!txStarted) {
+ if (!isTransactionStarted()) {
txContext.start();
- txStarted = true;
return true;
}
} catch (TransactionFailureException e) {
@@ -320,7 +334,7 @@ public class MutationState implements SQLCloseable {
}
public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) {
- MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null);
+ MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null);
state.sizeOffset = 0;
return state;
}
@@ -599,18 +613,24 @@ public class MutationState implements SQLCloseable {
Long scn = connection.getSCN();
MetaDataClient client = new MetaDataClient(connection);
long serverTimeStamp = tableRef.getTimeStamp();
- PTable table = tableRef.getTable();
// If we're auto committing, we've already validated the schema when we got the ColumnResolver,
// so no need to do it again here.
if (!connection.getAutoCommit()) {
+ PTable table = tableRef.getTable();
MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+ PTable resolvedTable = result.getTable();
+ if (resolvedTable == null) {
+ throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
+ }
+ // Always update tableRef table as the one we've cached may be out of date since when we executed
+ // the UPSERT VALUES call and updated in the cache before this.
+ tableRef.setTable(resolvedTable);
long timestamp = result.getMutationTime();
if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
serverTimeStamp = timestamp;
if (result.wasUpdated()) {
// TODO: use bitset?
- table = result.getTable();
- PColumn[] columns = new PColumn[table.getColumns().size()];
+ PColumn[] columns = new PColumn[resolvedTable.getColumns().size()];
for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
RowMutationState valueEntry = rowEntry.getValue();
if (valueEntry != null) {
@@ -624,10 +644,9 @@ public class MutationState implements SQLCloseable {
}
for (PColumn column : columns) {
if (column != null) {
- table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
+ resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
}
}
- tableRef.setTable(table);
}
}
}
@@ -731,25 +750,28 @@ public class MutationState implements SQLCloseable {
sendAll = true;
}
+ List<TableRef> txTableRefs = Lists.newArrayListWithExpectedSize(mutations.size());
// add tracing for this operation
try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
Span span = trace.getSpan();
ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+ boolean isTransactional;
while (tableRefIterator.hasNext()) {
// at this point we are going through mutations for each table
- TableRef tableRef = tableRefIterator.next();
+ final TableRef tableRef = tableRefIterator.next();
Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef);
if (valuesMap == null || valuesMap.isEmpty()) {
continue;
}
- PTable table = tableRef.getTable();
+ // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
+ long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
+ final PTable table = tableRef.getTable();
// Track tables to which we've sent uncommitted data
- if (table.isTransactional()) {
+ if (isTransactional = table.isTransactional()) {
+ txTableRefs.add(tableRef);
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
}
boolean isDataTable = true;
- // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
- long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
table.getIndexMaintainers(indexMetaDataPtr, connection);
Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
while (mutationsIterator.hasNext()) {
@@ -776,7 +798,7 @@ public class MutationState implements SQLCloseable {
SQLException sqlE = null;
HTableInterface hTable = connection.getQueryServices().getTable(htableName);
try {
- if (table.isTransactional()) {
+ if (isTransactional) {
// If we have indexes, wrap the HTable in a delegate HTable that
// will attach the necessary index meta data in the event of a
// rollback
@@ -830,8 +852,8 @@ public class MutationState implements SQLCloseable {
}
e = inferredE;
}
- // Throw to client with both what was committed so far and what is left to be committed.
- // That way, client can either undo what was done or try again with what was not done.
+ // Throw to client an exception that indicates the statements that
+ // were not committed successfully.
sqlE = new CommitException(e, getUncommittedStatementIndexes());
} finally {
try {
@@ -862,17 +884,24 @@ public class MutationState implements SQLCloseable {
if (tableRef.getTable().getType() != PTableType.INDEX) {
numRows -= valuesMap.size();
}
- // Remove batches as we process them
+ // For transactions, track the statement indexes as we send data
+ // over because our CommitException should include all statements
+ // involved in the transaction since none of them would have been
+ // committed in the event of a failure.
+ if (isTransactional) {
+ addUncommittedStatementIndexes(valuesMap.values());
+ }
+ // Remove batches as we process them
if (sendAll) {
- tableRefIterator.remove(); // Iterating through actual map in this case
+ // Iterating through map key set in this case, so we cannot use
+ // the remove method without getting a concurrent modification
+ // exception.
+ tableRefIterator.remove();
} else {
mutations.remove(tableRef);
}
}
}
- // Note that we cannot assume that *all* mutations have been sent, since we've optimized this
- // now to only send the mutations for the tables we're querying, hence we've removed the
- // assertions that we're here before.
}
public byte[] encodeTransaction() throws SQLException {
@@ -930,19 +959,17 @@ public class MutationState implements SQLCloseable {
return cache;
}
- private void clear() throws SQLException {
- this.mutations.clear();
- numRows = 0;
+ private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations) {
+ for (RowMutationState rowMutationState : rowMutations) {
+ uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, rowMutationState.getStatementIndexes());
+ }
}
private int[] getUncommittedStatementIndexes() {
- int[] result = new int[0];
- for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
- for (RowMutationState rowMutationState : rowMutations.values()) {
- result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes());
- }
+ for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) {
+ addUncommittedStatementIndexes(rowMutationMap.values());
}
- return result;
+ return uncommittedStatementIndexes;
}
@Override
@@ -950,65 +977,96 @@ public class MutationState implements SQLCloseable {
}
private void reset() {
- txStarted = false;
tx = null;
+ txAwares.clear();
uncommittedPhysicalNames.clear();
+ this.mutations.clear();
+ numRows = 0;
+ uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
}
public void rollback() throws SQLException {
- clear();
- txAwares.clear();
- if (txContext != null) {
- try {
- if (txStarted) {
+ try {
+ if (txContext != null && isTransactionStarted()) {
+ try {
txContext.abort();
+ } catch (TransactionFailureException e) {
+ throw TransactionUtil.getTransactionFailureException(e);
}
- } catch (TransactionFailureException e) {
- throw new SQLException(e); // TODO: error code
- } finally {
- reset();
}
+ } finally {
+ reset();
}
}
public void commit() throws SQLException {
- boolean sendMutationsFailed=false;
+ boolean sendSuccessful=false;
+ SQLException sqlE = null;
try {
send();
- } catch (Throwable t) {
- sendMutationsFailed=true;
- throw t;
+ sendSuccessful=true;
+ } catch (SQLException e) {
+ sqlE = e;
} finally {
- txAwares.clear();
- if (txContext != null) {
- try {
- if (txStarted && !sendMutationsFailed) {
- txContext.finish();
- }
- } catch (TransactionFailureException e) {
+ try {
+ if (txContext != null && isTransactionStarted()) {
+ TransactionFailureException txFailure = null;
+ boolean finishSuccessful=false;
try {
- txContext.abort(e);
- // abort and throw the original commit failure exception
- throw TransactionUtil.getTransactionFailureException(e);
- } catch (TransactionFailureException e1) {
- // if abort fails and throw the abort failure exception
- throw TransactionUtil.getTransactionFailureException(e1);
+ if (sendSuccessful) {
+ txContext.finish();
+ finishSuccessful = true;
+ }
+ } catch (TransactionFailureException e) {
+ txFailure = e;
+ SQLException nextE = TransactionUtil.getTransactionFailureException(e);
+ if (sqlE == null) {
+ sqlE = nextE;
+ } else {
+ sqlE.setNextException(nextE);
+ }
+ } finally {
+ // If send fails or finish fails, abort the tx
+ if (!finishSuccessful) {
+ try {
+ txContext.abort(txFailure);
+ } catch (TransactionFailureException e) {
+ SQLException nextE = TransactionUtil.getTransactionFailureException(e);
+ if (sqlE == null) {
+ sqlE = nextE;
+ } else {
+ sqlE.setNextException(nextE);
+ }
+ }
+ }
}
+ }
+ } finally {
+ try {
+ reset();
} finally {
- if (!sendMutationsFailed) {
- reset();
- }
- }
+ if (sqlE != null) {
+ throw sqlE;
+ }
+ }
}
}
}
/**
+ * Send to HBase any uncommitted data for transactional tables.
+ * @return true if any data was sent and false otherwise.
+ * @throws SQLException
+ */
+ public boolean sendUncommitted() throws SQLException {
+ return sendUncommitted(mutations.keySet().iterator());
+ }
+ /**
* Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a
* query. In this way, they are visible to subsequent reads but are not actually committed until
* commit is called.
* @param tableRefs
- * @return true if at least partially transactional and false otherwise.
+ * @return true if any data was sent and false otherwise.
* @throws SQLException
*/
public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 75ad63d..0e00cc6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -49,9 +49,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.cloudera.htrace.Span;
-import org.cloudera.htrace.Trace;
-import org.cloudera.htrace.TraceScope;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.hbase.index.MultiMutation;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -70,17 +67,20 @@ import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
/**
* Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction
* manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a
@@ -196,7 +196,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
// run a scan to get the current state. We'll need this to delete
// the existing index rows.
Transaction tx = indexMetaData.getTransaction();
- assert(tx != null);
+ if (tx == null) {
+ throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString());
+ }
List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
for (IndexMaintainer indexMaintainer : indexMaintainers) {
[7/7] phoenix git commit: PHOENIX-2546 Filters should take into
account that multiple versions may be scanned
Posted by ja...@apache.org.
PHOENIX-2546 Filters should take into account that multiple versions may be scanned
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0baa1d0f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0baa1d0f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0baa1d0f
Branch: refs/heads/4.x-HBase-0.98
Commit: 0baa1d0fe9cad54ca249d9ca39303192fa5e5e59
Parents: 24d9c3d
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Dec 29 23:00:27 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/tx/TransactionIT.java | 147 +++++++++++++++++++
.../phoenix/filter/ColumnProjectionFilter.java | 3 +-
.../MultiCFCQKeyValueComparisonFilter.java | 1 -
.../filter/MultiCQKeyValueComparisonFilter.java | 2 -
.../filter/MultiKeyValueComparisonFilter.java | 69 ++++-----
.../phoenix/filter/RowKeyComparisonFilter.java | 11 +-
.../SingleCFCQKeyValueComparisonFilter.java | 3 -
.../SingleCQKeyValueComparisonFilter.java | 3 -
.../filter/SingleKeyValueComparisonFilter.java | 19 +--
.../apache/phoenix/filter/SkipScanFilter.java | 6 +-
.../schema/tuple/SingleKeyValueTuple.java | 46 +++---
.../phoenix/filter/SkipScanFilterTest.java | 2 +-
12 files changed, 217 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 5a322bb..1aabb03 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -350,6 +351,7 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
assertFalse(rs.next());
}
+ @Ignore
@Test
public void testNonTxToTxTableFailure() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
@@ -725,4 +727,149 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
Result result = htable.get(new Get(Bytes.toBytes("j")));
assertTrue(result.isEmpty());
}
+
+ @Test
+ public void testCheckpointAndRollback() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+ try {
+ String fullTableName = "T";
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true");
+ stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
+ conn.commit();
+
+ stmt.executeUpdate("upsert into " + fullTableName + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName);
+ ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals("x", rs.getString(1));
+ assertEquals("aa", rs.getString(2));
+ assertEquals("a", rs.getString(3));
+ assertFalse(rs.next());
+
+ stmt.executeUpdate("upsert into " + fullTableName + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName);
+
+ rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals("x", rs.getString(1));
+ assertEquals("aaa", rs.getString(2));
+ assertEquals("a", rs.getString(3));
+ assertFalse(rs.next());
+
+ conn.rollback();
+
+ //assert original row exists in fullTableName1
+ rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals("x", rs.getString(1));
+ assertEquals("a", rs.getString(2));
+ assertEquals("a", rs.getString(3));
+ assertFalse(rs.next());
+
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Ignore("Add back once TEPHRA-162 gets fixed")
+ @Test
+ public void testInflightUpdateNotSeen() throws Exception {
+ String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+ try (Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ conn1.setAutoCommit(false);
+ conn2.setAutoCommit(true);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ conn1.commit();
+
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME + " WHERE int_col1 IS NULL");
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+
+ upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, int_col1) VALUES(?, ?, ?, ?, ?, ?, 1)";
+ stmt = conn1.prepareStatement(upsert);
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+
+ rs = conn1.createStatement().executeQuery("SELECT int_col1 FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ rs = conn2.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+ assertFalse(rs.next());
+
+ conn1.commit();
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = conn2.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+ }
+ }
+
+ @Ignore("Add back once TEPHRA-162 gets fixed")
+ @Test
+ public void testInflightDeleteNotSeen() throws Exception {
+ String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+ try (Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ conn1.setAutoCommit(false);
+ conn2.setAutoCommit(true);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ conn1.commit();
+
+ rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+
+ String delete = "DELETE FROM " + FULL_TABLE_NAME + " WHERE varchar_pk = 'varchar1'";
+ stmt = conn1.prepareStatement(delete);
+ int count = stmt.executeUpdate();
+ assertEquals(1,count);
+
+ rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertFalse(rs.next());
+
+ conn1.commit();
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index d074c56..b8b0350 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.util.Bytes;
@@ -177,6 +176,6 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
@Override
public ReturnCode filterKeyValue(Cell ignored) throws IOException {
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
index 9147f1a..3bd1fd7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
@@ -31,7 +31,6 @@ import org.apache.phoenix.expression.Expression;
* are references to multiple column qualifiers over multiple column families.
* Also there same qualifier names in different families.
*
- * @since 0.1
*/
public class MultiCFCQKeyValueComparisonFilter extends MultiKeyValueComparisonFilter {
private final ImmutablePairBytesPtr ptr = new ImmutablePairBytesPtr();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
index 5fa5035..91e4392 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
@@ -29,8 +29,6 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
* Filter that evaluates WHERE clause expression, used in the case where there
* are references to multiple unique column qualifiers over one or more column families.
*
- *
- * @since 0.1
*/
public class MultiCQKeyValueComparisonFilter extends MultiKeyValueComparisonFilter {
private ImmutableBytesPtr ptr = new ImmutableBytesPtr();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index 1cb2255..569faa5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -24,8 +24,6 @@ import java.util.Map;
import java.util.TreeSet;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.expression.Expression;
@@ -41,8 +39,6 @@ import org.apache.phoenix.schema.tuple.BaseTuple;
* but for general expression evaluation in the case where multiple KeyValue
* columns are referenced in the expression.
*
- *
- * @since 0.1
*/
public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFilter {
private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
@@ -59,14 +55,14 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
init();
}
- private static final class KeyValueRef {
- public KeyValue keyValue;
+ private static final class CellRef {
+ public Cell cell;
@Override
public String toString() {
- if(keyValue != null) {
- return keyValue.toString() + " value = " + Bytes.toStringBinary(
- keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+ if(cell != null) {
+ return cell.toString() + " value = " + Bytes.toStringBinary(
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
} else {
return super.toString();
}
@@ -79,13 +75,13 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
private final class IncrementalResultTuple extends BaseTuple {
private int refCount;
private final ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
- private final Map<Object,KeyValueRef> foundColumns = new HashMap<Object,KeyValueRef>(5);
+ private final Map<Object,CellRef> foundColumns = new HashMap<Object,CellRef>(5);
public void reset() {
refCount = 0;
keyPtr.set(UNITIALIZED_KEY_BUFFER);
- for (KeyValueRef ref : foundColumns.values()) {
- ref.keyValue = null;
+ for (CellRef ref : foundColumns.values()) {
+ ref.cell = null;
}
}
@@ -98,39 +94,39 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
refCount = foundColumns.size();
}
- public ReturnCode resolveColumn(KeyValue value) {
+ public ReturnCode resolveColumn(Cell value) {
// Always set key, in case we never find a key value column of interest,
// and our expression uses row key columns.
setKey(value);
Object ptr = setColumnKey(value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength());
- KeyValueRef ref = foundColumns.get(ptr);
+ CellRef ref = foundColumns.get(ptr);
if (ref == null) {
- // Return INCLUDE here. Although this filter doesn't need this KV
+ // Return INCLUDE_AND_NEXT_COL here. Although this filter doesn't need this KV
// it should still be projected into the Result
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
// Since we only look at the latest key value for a given column,
// we are not interested in older versions
// TODO: test with older versions to confirm this doesn't get tripped
// This shouldn't be necessary, because a scan only looks at the latest
// version
- if (ref.keyValue != null) {
+ if (ref.cell != null) {
// Can't do NEXT_ROW, because then we don't match the other columns
// SKIP, INCLUDE, and NEXT_COL seem to all act the same
return ReturnCode.NEXT_COL;
}
- ref.keyValue = value;
+ ref.cell = value;
refCount++;
return null;
}
public void addColumn(byte[] cf, byte[] cq) {
Object ptr = MultiKeyValueComparisonFilter.this.newColumnKey(cf, 0, cf.length, cq, 0, cq.length);
- foundColumns.put(ptr, new KeyValueRef());
+ foundColumns.put(ptr, new CellRef());
}
- public void setKey(KeyValue value) {
+ public void setKey(Cell value) {
keyPtr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength());
}
@@ -140,10 +136,10 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
}
@Override
- public KeyValue getValue(byte[] cf, byte[] cq) {
+ public Cell getValue(byte[] cf, byte[] cq) {
Object ptr = setColumnKey(cf, 0, cf.length, cq, 0, cq.length);
- KeyValueRef ref = foundColumns.get(ptr);
- return ref == null ? null : ref.keyValue;
+ CellRef ref = foundColumns.get(ptr);
+ return ref == null ? null : ref.cell;
}
@Override
@@ -157,15 +153,15 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
}
@Override
- public KeyValue getValue(int index) {
+ public Cell getValue(int index) {
// This won't perform very well, but it's not
// currently used anyway
- for (KeyValueRef ref : foundColumns.values()) {
- if (ref.keyValue == null) {
+ for (CellRef ref : foundColumns.values()) {
+ if (ref.cell == null) {
continue;
}
if (index == 0) {
- return ref.keyValue;
+ return ref.cell;
}
index--;
}
@@ -175,10 +171,10 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
@Override
public boolean getValue(byte[] family, byte[] qualifier,
ImmutableBytesWritable ptr) {
- KeyValue kv = getValue(family, qualifier);
- if (kv == null)
+ Cell cell = getValue(family, qualifier);
+ if (cell == null)
return false;
- ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
return true;
}
}
@@ -197,17 +193,17 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
}
@Override
- public ReturnCode filterKeyValue(Cell keyValue) {
+ public ReturnCode filterKeyValue(Cell cell) {
if (Boolean.TRUE.equals(this.matchedColumn)) {
// We already found and matched the single column, all keys now pass
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
if (Boolean.FALSE.equals(this.matchedColumn)) {
// We found all the columns, but did not match the expression, so skip to next row
return ReturnCode.NEXT_ROW;
}
// This is a key value we're not interested in (TODO: why INCLUDE here instead of NEXT_COL?)
- ReturnCode code = inputTuple.resolveColumn(KeyValueUtil.ensureKeyValue(keyValue));
+ ReturnCode code = inputTuple.resolveColumn(cell);
if (code != null) {
return code;
}
@@ -220,10 +216,10 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
if (inputTuple.isImmutable()) {
this.matchedColumn = Boolean.FALSE;
} else {
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
}
- return this.matchedColumn ? ReturnCode.INCLUDE : ReturnCode.NEXT_ROW;
+ return this.matchedColumn ? ReturnCode.INCLUDE_AND_NEXT_COL : ReturnCode.NEXT_ROW;
}
@Override
@@ -243,8 +239,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
super.reset();
}
- @SuppressWarnings("all")
- // suppressing missing @Override since this doesn't exist for HBase 0.94.4
+ @Override
public boolean isFamilyEssential(byte[] name) {
// Only the column families involved in the expression are essential.
// The others are for columns projected in the select expression.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
index b7de7ac..2eb69bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
@@ -22,7 +22,6 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -38,8 +37,6 @@ import org.slf4j.LoggerFactory;
*
* Filter for use when expressions only reference row key columns
*
- *
- * @since 0.1
*/
public class RowKeyComparisonFilter extends BooleanExpressionFilter {
private static final Logger logger = LoggerFactory.getLogger(RowKeyComparisonFilter.class);
@@ -79,7 +76,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
}
evaluate = false;
}
- return keepRow ? ReturnCode.INCLUDE : ReturnCode.NEXT_ROW;
+ return keepRow ? ReturnCode.INCLUDE_AND_NEXT_COL : ReturnCode.NEXT_ROW;
}
private final class RowKeyTuple extends BaseTuple {
@@ -99,7 +96,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
}
@Override
- public KeyValue getValue(byte[] cf, byte[] cq) {
+ public Cell getValue(byte[] cf, byte[] cq) {
return null;
}
@@ -119,7 +116,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
}
@Override
- public KeyValue getValue(int index) {
+ public Cell getValue(int index) {
throw new IndexOutOfBoundsException(Integer.toString(index));
}
@@ -135,7 +132,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
return !this.keepRow;
}
- @SuppressWarnings("all") // suppressing missing @Override since this doesn't exist for HBase 0.94.4
+ @Override
public boolean isFamilyEssential(byte[] name) {
// We only need our "guaranteed to have a key value" column family,
// which we pass in and serialize through. In the case of a VIEW where
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
index 963fe59..c63673c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
-
import org.apache.phoenix.expression.Expression;
@@ -32,8 +31,6 @@ import org.apache.phoenix.expression.Expression;
* column qualifier parts of the key value to disambiguate with another similarly
* named column qualifier in a different column family.
*
- *
- * @since 0.1
*/
public class SingleCFCQKeyValueComparisonFilter extends SingleKeyValueComparisonFilter {
public SingleCFCQKeyValueComparisonFilter() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
index 240c8a6..0d904bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
-
import org.apache.phoenix.expression.Expression;
@@ -32,8 +31,6 @@ import org.apache.phoenix.expression.Expression;
* part of the key value since the column qualifier is unique across all column
* families.
*
- *
- * @since 0.1
*/
public class SingleCQKeyValueComparisonFilter extends SingleKeyValueComparisonFilter {
public SingleCQKeyValueComparisonFilter() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index 8929f8a..eaf8d35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -21,7 +21,6 @@ import java.io.DataInput;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -37,8 +36,6 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
* but for general expression evaluation in the case where only a single KeyValue
* column is referenced in the expression.
*
- *
- * @since 0.1
*/
public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFilter {
private final SingleKeyValueTuple inputTuple = new SingleKeyValueTuple();
@@ -76,8 +73,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
public ReturnCode filterKeyValue(Cell keyValue) {
if (this.matchedColumn) {
// We already found and matched the single column, all keys now pass
- // TODO: why won't this cause earlier versions of a kv to be included?
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
if (this.foundColumn()) {
// We found all the columns, but did not match the expression, so skip to next row
@@ -87,19 +83,18 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength()) != 0) {
// Remember the key in case this is the only key value we see.
// We'll need it if we have row key columns too.
- inputTuple.setKey(KeyValueUtil.ensureKeyValue(keyValue));
+ inputTuple.setKey(keyValue);
// This is a key value we're not interested in
- // TODO: use NEXT_COL when bug fix comes through that includes the row still
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
- inputTuple.setKeyValue(KeyValueUtil.ensureKeyValue(keyValue));
+ inputTuple.setCell(keyValue);
// We have the columns, so evaluate here
if (!Boolean.TRUE.equals(evaluate(inputTuple))) {
return ReturnCode.NEXT_ROW;
}
this.matchedColumn = true;
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
@Override
@@ -124,7 +119,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
return true;
}
- @Override
+ @Override
public void reset() {
inputTuple.reset();
matchedColumn = false;
@@ -137,7 +132,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
init();
}
- @SuppressWarnings("all") // suppressing missing @Override since this doesn't exist for HBase 0.94.4
+ @Override
public boolean isFamilyEssential(byte[] name) {
// Only the column families involved in the expression are essential.
// The others are for columns projected in the select expression
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
index 667b3d7..77b4cf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
@@ -274,7 +274,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
// more than we need. We can optimize this by tracking whether each range in each slot position
// intersects.
ReturnCode endCode = navigate(upperExclusiveKey, 0, upperExclusiveKey.length, Terminate.AT);
- if (endCode == ReturnCode.INCLUDE) {
+ if (endCode == ReturnCode.INCLUDE || endCode == ReturnCode.INCLUDE_AND_NEXT_COL) {
setStartKey();
// If the upperExclusiveKey is equal to the start key, we've gone one position too far, since
// our upper key is exclusive. In that case, go to the previous key
@@ -358,7 +358,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
// First check to see if we're in-range until we reach our end key
if (endKeyLength > 0) {
if (Bytes.compareTo(currentKey, offset, length, endKey, 0, endKeyLength) < 0) {
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
// If key range of last slot is a single key, we can increment our position
@@ -490,7 +490,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
// up to the upper range of our last slot. We do this for ranges and single keys
// since we potentially have multiple key values for the same row key.
setEndKey(ptr, minOffset, i);
- return ReturnCode.INCLUDE;
+ return ReturnCode.INCLUDE_AND_NEXT_COL;
}
private boolean allTrailingNulls(int i) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
index 8226208..e35eb13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
@@ -17,24 +17,24 @@
*/
package org.apache.phoenix.schema.tuple;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
public class SingleKeyValueTuple extends BaseTuple {
private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
- private KeyValue keyValue;
+ private Cell cell;
private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
public SingleKeyValueTuple() {
}
- public SingleKeyValueTuple(KeyValue keyValue) {
- if (keyValue == null) {
+ public SingleKeyValueTuple(Cell cell) {
+ if (cell == null) {
throw new NullPointerException();
}
- setKeyValue(keyValue);
+ setCell(cell);
}
public boolean hasKey() {
@@ -42,28 +42,27 @@ public class SingleKeyValueTuple extends BaseTuple {
}
public void reset() {
- this.keyValue = null;
+ this.cell = null;
keyPtr.set(UNITIALIZED_KEY_BUFFER);
}
- public void setKeyValue(KeyValue keyValue) {
- if (keyValue == null) {
+ public void setCell(Cell cell) {
+ if (cell == null) {
throw new IllegalArgumentException();
}
- this.keyValue = keyValue;
- setKey(keyValue);
+ this.cell = cell;
+ setKey(cell);
}
public void setKey(ImmutableBytesWritable ptr) {
keyPtr.set(ptr.get(), ptr.getOffset(), ptr.getLength());
}
- @SuppressWarnings("deprecation")
- public void setKey(KeyValue keyValue) {
- if (keyValue == null) {
+ public void setKey(Cell cell) {
+ if (cell == null) {
throw new IllegalArgumentException();
}
- keyPtr.set(keyValue.getBuffer(), keyValue.getRowOffset(), keyValue.getRowLength());
+ keyPtr.set(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
@Override
@@ -72,8 +71,8 @@ public class SingleKeyValueTuple extends BaseTuple {
}
@Override
- public KeyValue getValue(byte[] cf, byte[] cq) {
- return keyValue;
+ public Cell getValue(byte[] cf, byte[] cq) {
+ return cell;
}
@Override
@@ -83,29 +82,28 @@ public class SingleKeyValueTuple extends BaseTuple {
@Override
public String toString() {
- return "SingleKeyValueTuple[" + keyValue == null ? keyPtr.get() == UNITIALIZED_KEY_BUFFER ? "null" : Bytes.toStringBinary(keyPtr.get(),keyPtr.getOffset(),keyPtr.getLength()) : keyValue.toString() + "]";
+ return "SingleKeyValueTuple[" + cell == null ? keyPtr.get() == UNITIALIZED_KEY_BUFFER ? "null" : Bytes.toStringBinary(keyPtr.get(),keyPtr.getOffset(),keyPtr.getLength()) : cell.toString() + "]";
}
@Override
public int size() {
- return keyValue == null ? 0 : 1;
+ return cell == null ? 0 : 1;
}
@Override
- public KeyValue getValue(int index) {
- if (index != 0 || keyValue == null) {
+ public Cell getValue(int index) {
+ if (index != 0 || cell == null) {
throw new IndexOutOfBoundsException(Integer.toString(index));
}
- return keyValue;
+ return cell;
}
- @SuppressWarnings("deprecation")
@Override
public boolean getValue(byte[] family, byte[] qualifier,
ImmutableBytesWritable ptr) {
- if (keyValue == null)
+ if (cell == null)
return false;
- ptr.set(keyValue.getBuffer(), keyValue.getValueOffset(), keyValue.getValueLength());
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
return true;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
index 898f778..4cb71ff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
@@ -402,7 +402,7 @@ public class SkipScanFilterTest extends TestCase {
skipper.reset();
assertFalse(skipper.filterAllRemaining());
assertFalse(skipper.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()));
- assertEquals(kv.toString(), ReturnCode.INCLUDE, skipper.filterKeyValue(kv));
+ assertEquals(kv.toString(), ReturnCode.INCLUDE_AND_NEXT_COL, skipper.filterKeyValue(kv));
}
@Override public String toString() {
[3/7] phoenix git commit: PHOENIX-2551 Remove Jdbc7Shim as it's no
longer necessary
Posted by ja...@apache.org.
PHOENIX-2551 Remove Jdbc7Shim as it's no longer necessary
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e44e7a4e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e44e7a4e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e44e7a4e
Branch: refs/heads/4.x-HBase-0.98
Commit: e44e7a4e5061f0deee30ace0ff9bda515798d397
Parents: 0baa1d0
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Dec 30 09:19:41 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800
----------------------------------------------------------------------
.../phoenix/trace/DelegateConnection.java | 324 ++++++++++++++++++
.../phoenix/trace/DelegatingConnection.java | 328 -------------------
.../phoenix/trace/PhoenixTracingEndToEndIT.java | 3 +-
.../java/org/apache/phoenix/jdbc/Jdbc7Shim.java | 64 ----
.../apache/phoenix/jdbc/PhoenixConnection.java | 2 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +-
.../phoenix/jdbc/PhoenixEmbeddedDriver.java | 3 +-
.../apache/phoenix/jdbc/PhoenixResultSet.java | 2 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 2 +-
9 files changed, 330 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java
new file mode 100644
index 0000000..8fff469
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * Simple {@link Connection} that just delegates to an underlying {@link Connection}.
+ */
+public class DelegateConnection implements Connection {
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return conn.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return conn.isWrapperFor(iface);
+ }
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return conn.createStatement();
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return conn.prepareStatement(sql);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return conn.prepareCall(sql);
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return conn.nativeSQL(sql);
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+ conn.setAutoCommit(autoCommit);
+ }
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return conn.getAutoCommit();
+ }
+
+ @Override
+ public void commit() throws SQLException {
+ conn.commit();
+ }
+
+ @Override
+ public void rollback() throws SQLException {
+ conn.rollback();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ conn.close();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return conn.isClosed();
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return conn.getMetaData();
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+ conn.setReadOnly(readOnly);
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return conn.isReadOnly();
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+ conn.setCatalog(catalog);
+ }
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return conn.getCatalog();
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+ conn.setTransactionIsolation(level);
+ }
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return conn.getTransactionIsolation();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return conn.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ conn.clearWarnings();
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return conn.createStatement(resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public PreparedStatement
+ prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return conn.getTypeMap();
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+ conn.setTypeMap(map);
+ }
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+ conn.setHoldability(holdability);
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return conn.getHoldability();
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return conn.setSavepoint();
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return conn.setSavepoint(name);
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+ conn.rollback(savepoint);
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ conn.releaseSavepoint(savepoint);
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency,
+ int resultSetHoldability) throws SQLException {
+ return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType,
+ int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
+ int resultSetHoldability) throws SQLException {
+ return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return conn.prepareStatement(sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return conn.prepareStatement(sql, columnIndexes);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return conn.prepareStatement(sql, columnNames);
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return conn.createClob();
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return conn.createBlob();
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return conn.createNClob();
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return conn.createSQLXML();
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return conn.isValid(timeout);
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+ conn.setClientInfo(name, value);
+ }
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+ conn.setClientInfo(properties);
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return conn.getClientInfo(name);
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return conn.getClientInfo();
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return conn.createArrayOf(typeName, elements);
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return conn.createStruct(typeName, attributes);
+ }
+
+ private Connection conn;
+
+ public DelegateConnection(Connection conn) {
+ this.conn = conn;
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {
+ conn.setSchema(schema);
+ }
+
+ @Override
+ public String getSchema() throws SQLException {
+ return conn.getSchema();
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {
+ conn.abort(executor);
+ }
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+ conn.setNetworkTimeout(executor, milliseconds);
+ }
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return conn.getNetworkTimeout();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
deleted file mode 100644
index 261522d..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.trace;
-
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.CallableStatement;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.NClob;
-import java.sql.PreparedStatement;
-import java.sql.SQLClientInfoException;
-import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Savepoint;
-import java.sql.Statement;
-import java.sql.Struct;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-import org.apache.phoenix.jdbc.Jdbc7Shim;
-
-/**
- * Simple {@link Connection} that just delegates to an underlying {@link Connection}.
- * @param <D> delegate type that is both a {@link Connection} and a {@link Jdbc7Shim#Connection}
- */
-public class DelegatingConnection<D extends Connection & Jdbc7Shim.Connection> implements
- Connection, Jdbc7Shim.Connection {
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- return conn.unwrap(iface);
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- return conn.isWrapperFor(iface);
- }
-
- @Override
- public Statement createStatement() throws SQLException {
- return conn.createStatement();
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql) throws SQLException {
- return conn.prepareStatement(sql);
- }
-
- @Override
- public CallableStatement prepareCall(String sql) throws SQLException {
- return conn.prepareCall(sql);
- }
-
- @Override
- public String nativeSQL(String sql) throws SQLException {
- return conn.nativeSQL(sql);
- }
-
- @Override
- public void setAutoCommit(boolean autoCommit) throws SQLException {
- conn.setAutoCommit(autoCommit);
- }
-
- @Override
- public boolean getAutoCommit() throws SQLException {
- return conn.getAutoCommit();
- }
-
- @Override
- public void commit() throws SQLException {
- conn.commit();
- }
-
- @Override
- public void rollback() throws SQLException {
- conn.rollback();
- }
-
- @Override
- public void close() throws SQLException {
- conn.close();
- }
-
- @Override
- public boolean isClosed() throws SQLException {
- return conn.isClosed();
- }
-
- @Override
- public DatabaseMetaData getMetaData() throws SQLException {
- return conn.getMetaData();
- }
-
- @Override
- public void setReadOnly(boolean readOnly) throws SQLException {
- conn.setReadOnly(readOnly);
- }
-
- @Override
- public boolean isReadOnly() throws SQLException {
- return conn.isReadOnly();
- }
-
- @Override
- public void setCatalog(String catalog) throws SQLException {
- conn.setCatalog(catalog);
- }
-
- @Override
- public String getCatalog() throws SQLException {
- return conn.getCatalog();
- }
-
- @Override
- public void setTransactionIsolation(int level) throws SQLException {
- conn.setTransactionIsolation(level);
- }
-
- @Override
- public int getTransactionIsolation() throws SQLException {
- return conn.getTransactionIsolation();
- }
-
- @Override
- public SQLWarning getWarnings() throws SQLException {
- return conn.getWarnings();
- }
-
- @Override
- public void clearWarnings() throws SQLException {
- conn.clearWarnings();
- }
-
- @Override
- public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
- return conn.createStatement(resultSetType, resultSetConcurrency);
- }
-
- @Override
- public PreparedStatement
- prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
- }
-
- @Override
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
- throws SQLException {
- return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
- }
-
- @Override
- public Map<String, Class<?>> getTypeMap() throws SQLException {
- return conn.getTypeMap();
- }
-
- @Override
- public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
- conn.setTypeMap(map);
- }
-
- @Override
- public void setHoldability(int holdability) throws SQLException {
- conn.setHoldability(holdability);
- }
-
- @Override
- public int getHoldability() throws SQLException {
- return conn.getHoldability();
- }
-
- @Override
- public Savepoint setSavepoint() throws SQLException {
- return conn.setSavepoint();
- }
-
- @Override
- public Savepoint setSavepoint(String name) throws SQLException {
- return conn.setSavepoint(name);
- }
-
- @Override
- public void rollback(Savepoint savepoint) throws SQLException {
- conn.rollback(savepoint);
- }
-
- @Override
- public void releaseSavepoint(Savepoint savepoint) throws SQLException {
- conn.releaseSavepoint(savepoint);
- }
-
- @Override
- public Statement createStatement(int resultSetType, int resultSetConcurrency,
- int resultSetHoldability) throws SQLException {
- return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int resultSetType,
- int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
- }
-
- @Override
- public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
- int resultSetHoldability) throws SQLException {
- return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- return conn.prepareStatement(sql, autoGeneratedKeys);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- return conn.prepareStatement(sql, columnIndexes);
- }
-
- @Override
- public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- return conn.prepareStatement(sql, columnNames);
- }
-
- @Override
- public Clob createClob() throws SQLException {
- return conn.createClob();
- }
-
- @Override
- public Blob createBlob() throws SQLException {
- return conn.createBlob();
- }
-
- @Override
- public NClob createNClob() throws SQLException {
- return conn.createNClob();
- }
-
- @Override
- public SQLXML createSQLXML() throws SQLException {
- return conn.createSQLXML();
- }
-
- @Override
- public boolean isValid(int timeout) throws SQLException {
- return conn.isValid(timeout);
- }
-
- @Override
- public void setClientInfo(String name, String value) throws SQLClientInfoException {
- conn.setClientInfo(name, value);
- }
-
- @Override
- public void setClientInfo(Properties properties) throws SQLClientInfoException {
- conn.setClientInfo(properties);
- }
-
- @Override
- public String getClientInfo(String name) throws SQLException {
- return conn.getClientInfo(name);
- }
-
- @Override
- public Properties getClientInfo() throws SQLException {
- return conn.getClientInfo();
- }
-
- @Override
- public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
- return conn.createArrayOf(typeName, elements);
- }
-
- @Override
- public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
- return conn.createStruct(typeName, attributes);
- }
-
- private D conn;
-
- public DelegatingConnection(D conn) {
- this.conn = conn;
- }
-
- @Override
- public void setSchema(String schema) throws SQLException {
- conn.setSchema(schema);
- }
-
- @Override
- public String getSchema() throws SQLException {
- return conn.getSchema();
- }
-
- @Override
- public void abort(Executor executor) throws SQLException {
- conn.abort(executor);
- }
-
- @Override
- public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
- conn.setNetworkTimeout(executor, milliseconds);
- }
-
- @Override
- public int getNetworkTimeout() throws SQLException {
- return conn.getNetworkTimeout();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
index 05d9e41..686b5f1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
@@ -496,10 +496,9 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
}
}
- private static class CountDownConnection extends DelegatingConnection {
+ private static class CountDownConnection extends DelegateConnection {
private CountDownLatch commit;
- @SuppressWarnings("unchecked")
public CountDownConnection(Connection conn, CountDownLatch commit) {
super(conn);
this.commit = commit;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
deleted file mode 100644
index f51da30..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.jdbc;
-
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.concurrent.Executor;
-import java.util.logging.Logger;
-
-/**
- * Interfaces to be implemented by classes that need to be "JDK7" compliant,
- * but also run in JDK6
- */
-public final class Jdbc7Shim {
-
- public interface Statement { // Note: do not extend "regular" statement or else eclipse 3.7 complains
- void closeOnCompletion() throws SQLException;
- boolean isCloseOnCompletion() throws SQLException;
- }
-
- public interface CallableStatement extends Statement {
- public <T> T getObject(int columnIndex, Class<T> type) throws SQLException;
- public <T> T getObject(String columnLabel, Class<T> type) throws SQLException;
- }
-
- public interface Connection {
- void setSchema(String schema) throws SQLException;
- String getSchema() throws SQLException;
- void abort(Executor executor) throws SQLException;
- void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException;
- int getNetworkTimeout() throws SQLException;
- }
-
- public interface ResultSet {
- public <T> T getObject(int columnIndex, Class<T> type) throws SQLException;
- public <T> T getObject(String columnLabel, Class<T> type) throws SQLException;
- }
-
- public interface DatabaseMetaData {
- java.sql.ResultSet getPseudoColumns(String catalog, String schemaPattern,
- String tableNamePattern, String columnNamePattern)
- throws SQLException;
- boolean generatedKeyAlwaysReturned() throws SQLException;
- }
-
- public interface Driver {
- public Logger getParentLogger() throws SQLFeatureNotSupportedException;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index ed906a4..23d3235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -122,7 +122,7 @@ import co.cask.tephra.TransactionContext;
*
* @since 0.1
*/
-public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jdbc7Shim.Connection, MetaDataMutated{
+public class PhoenixConnection implements Connection, MetaDataMutated {
private final String url;
private final ConnectionQueryServices services;
private final Properties info;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 940ca52..87c02d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -98,7 +98,7 @@ import com.google.common.collect.Lists;
*
* @since 0.1
*/
-public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.phoenix.jdbc.Jdbc7Shim.DatabaseMetaData {
+public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final int INDEX_NAME_INDEX = 4; // Shared with FAMILY_NAME_INDEX
public static final int FAMILY_NAME_INDEX = 4;
public static final int COLUMN_NAME_INDEX = 3;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index 8ee9dbd..c49bf37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -31,7 +31,6 @@ import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -55,7 +54,7 @@ import com.google.common.collect.Maps;
* @since 0.1
*/
@Immutable
-public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoenix.jdbc.Jdbc7Shim.Driver, SQLCloseable {
+public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
/**
* The protocol for Phoenix Network Client
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index c1bbe81..a3ce1a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -100,7 +100,7 @@ import com.google.common.annotations.VisibleForTesting;
*
* @since 0.1
*/
-public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.ResultSet {
+public class PhoenixResultSet implements ResultSet, SQLCloseable {
private static final Log LOG = LogFactory.getLog(PhoenixResultSet.class);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 550e915..f36bbd8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -177,7 +177,7 @@ import com.google.common.math.IntMath;
*
* @since 0.1
*/
-public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
+public class PhoenixStatement implements Statement, SQLCloseable {
private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class);
[5/7] phoenix git commit: PHOENIX-2453 Make prepared statement
creation delegate to
org.apache.phoenix.jdbc.PhoenixConnection#prepareStatement(java.lang.String)
(Clement Escoffier)
Posted by ja...@apache.org.
PHOENIX-2453 Make prepared statement creation delegate to org.apache.phoenix.jdbc.PhoenixConnection#prepareStatement(java.lang.String) (Clement Escoffier)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/24d9c3d6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/24d9c3d6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/24d9c3d6
Branch: refs/heads/4.x-HBase-0.98
Commit: 24d9c3d6354a6be695c46747d9349d94c614744c
Parents: 4e96747
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Dec 28 22:42:46 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800
----------------------------------------------------------------------
.../apache/phoenix/jdbc/PhoenixConnection.java | 9 ++++--
.../apache/phoenix/jdbc/PhoenixStatement.java | 12 ++++----
.../jdbc/PhoenixPreparedStatementTest.java | 30 +++++++++++++++++---
3 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/24d9c3d6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d301321..ed906a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -692,17 +692,20 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ // Ignore autoGeneratedKeys, and just execute the statement.
+ return prepareStatement(sql);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ // Ignore columnIndexes, and just execute the statement.
+ return prepareStatement(sql);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ // Ignore columnNames, and just execute the statement.
+ return prepareStatement(sql);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/24d9c3d6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 86a139b..550e915 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1300,32 +1300,32 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ return execute(sql);
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ return execute(sql);
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ return execute(sql);
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ return executeUpdate(sql);
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ return executeUpdate(sql);
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
- throw new SQLFeatureNotSupportedException();
+ return executeUpdate(sql);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/24d9c3d6/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
index f9b0274..287fb4b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
@@ -20,14 +20,12 @@ package org.apache.phoenix.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
+import java.sql.*;
import java.util.Properties;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.junit.Test;
@@ -180,4 +178,28 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
}
+ @Test
+ public void testPreparedStatementWithGeneratedKeys() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement statement = conn.prepareStatement("CREATE TABLE T (pk1 CHAR(15) not null, pk2 VARCHAR not null, v1 VARCHAR(15), v2 DATE, " +
+ "v3 VARCHAR CONSTRAINT pk PRIMARY KEY (pk1, pk2))", PreparedStatement.RETURN_GENERATED_KEYS);
+ statement.execute();
+ }
+
+ @Test
+ public void testPreparedStatementWithColumnIndexes() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement statement = conn.prepareStatement("CREATE TABLE T (pk1 CHAR(15) not null, pk2 VARCHAR not null, v1 VARCHAR(15), v2 DATE, " +
+ "v3 VARCHAR CONSTRAINT pk PRIMARY KEY (pk1, pk2))", new int[0]);
+ statement.execute();
+ }
+
+ @Test
+ public void testPreparedStatementWithColumnNames() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement statement = conn.prepareStatement("CREATE TABLE T (pk1 CHAR(15) not null, pk2 VARCHAR not null, v1 VARCHAR(15), v2 DATE, " +
+ "v3 VARCHAR CONSTRAINT pk PRIMARY KEY (pk1, pk2))", new String[0]);
+ statement.execute();
+ }
+
}
[6/7] phoenix git commit: PHOENIX-2550 Increase JDBC major/minor
version for 4.7 release
Posted by ja...@apache.org.
PHOENIX-2550 Increase JDBC major/minor version for 4.7 release
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3172ed45
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3172ed45
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3172ed45
Branch: refs/heads/4.x-HBase-0.98
Commit: 3172ed459d91a5387f29fa99b5d352a38ea0c764
Parents: e44e7a4
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Dec 30 09:32:53 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800
----------------------------------------------------------------------
.../main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3172ed45/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index ba06828..bf4a192 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -55,7 +55,7 @@ import com.google.protobuf.ByteString;
*/
public abstract class MetaDataProtocol extends MetaDataService {
public static final int PHOENIX_MAJOR_VERSION = 4;
- public static final int PHOENIX_MINOR_VERSION = 6;
+ public static final int PHOENIX_MINOR_VERSION = 7;
public static final int PHOENIX_PATCH_NUMBER = 0;
public static final int PHOENIX_VERSION =
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
[2/7] phoenix git commit: PHOENIX-2411 Allow Phoenix to participate
as transactional component
Posted by ja...@apache.org.
PHOENIX-2411 Allow Phoenix to participate as transactional component
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4e967475
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4e967475
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4e967475
Branch: refs/heads/4.x-HBase-0.98
Commit: 4e96747536078324f2c9044fc9815501e692aeed
Parents: 7e4d39e
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Dec 26 23:56:59 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:27 2015 -0800
----------------------------------------------------------------------
.../org/apache/phoenix/tx/TransactionIT.java | 136 ++++++++++++++++++-
.../phoenix/exception/SQLExceptionCode.java | 2 +
.../apache/phoenix/execute/MutationState.java | 9 +-
.../apache/phoenix/jdbc/PhoenixConnection.java | 56 ++++++--
.../apache/phoenix/jdbc/PhoenixStatement.java | 13 +-
.../org/apache/phoenix/query/QueryServices.java | 9 +-
.../phoenix/query/QueryServicesOptions.java | 6 +-
.../org/apache/phoenix/util/PhoenixRuntime.java | 61 ++++++---
8 files changed, 247 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 0b3ba91..5a322bb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -20,8 +20,6 @@ package org.apache.phoenix.tx;
import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
-import static org.apache.phoenix.util.TestUtil.analyzeTable;
-import static org.apache.phoenix.util.TestUtil.getAllSplits;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -41,22 +39,22 @@ import java.util.Properties;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.Shadower;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
@@ -66,12 +64,15 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
public class TransactionIT extends BaseHBaseManagedTimeIT {
private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
@@ -603,4 +604,125 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
}
assertEquals(5, count);
}
+
+ @Test
+ public void testExternalTxContext() throws Exception {
+ ResultSet rs;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+
+ TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient();
+
+ String fullTableName = "T";
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
+ HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
+ stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
+ conn.commit();
+
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ }
+
+ // Use HBase level Tephra APIs to start a new transaction
+ TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW);
+ TransactionContext txContext = new TransactionContext(txServiceClient, txAware);
+ txContext.start();
+
+ // Use HBase APIs to add a new row
+ Put put = new Put(Bytes.toBytes("z"));
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b"));
+ txAware.put(put);
+
+ // Use Phoenix APIs to add new row (sharing the transaction context)
+ pconn.setTransactionContext(txContext);
+ conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('y', 'c', 'c')");
+
+ // New connection should not see data as it hasn't been committed yet
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ }
+
+ // Use new connection to create a row with a conflict
+ Connection connWithConflict = DriverManager.getConnection(getUrl(), props);
+ connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('z', 'd', 'd')");
+
+ // Existing connection should see data even though it hasn't been committed yet
+ rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+
+ // Use Tephra APIs directly to finish (i.e. commit) the transaction
+ txContext.finish();
+
+ // Confirm that attempt to commit row with conflict fails
+ try {
+ connWithConflict.commit();
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), e.getErrorCode());
+ } finally {
+ connWithConflict.close();
+ }
+
+ // New connection should now see data as it has been committed
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ }
+
+ // Repeat the same as above, but this time abort the transaction
+ txContext = new TransactionContext(txServiceClient, txAware);
+ txContext.start();
+
+ // Use HBase APIs to add a new row
+ put = new Put(Bytes.toBytes("j"));
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e"));
+ txAware.put(put);
+
+ // Use Phoenix APIs to add new row (sharing the transaction context)
+ pconn.setTransactionContext(txContext);
+ conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('k', 'f', 'f')");
+
+ // Existing connection should see data even though it hasn't been committed yet
+ rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(5,rs.getInt(1));
+
+ connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('k', 'g', 'g')");
+ rs = connWithConflict.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+
+ // Use Tephra APIs directly to abort (i.e. rollback) the transaction
+ txContext.abort();
+
+ rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+
+ // Should succeed since conflicting row was aborted
+ connWithConflict.commit();
+
+ // New connection should now see data as it has been committed
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ }
+
+ // Even using HBase APIs directly, we shouldn't find 'j' since a delete marker would have been
+ // written to hide it.
+ Result result = htable.get(new Get(Bytes.toBytes("j")));
+ assertTrue(result.isEmpty());
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index ef135eb..36036ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -280,6 +280,8 @@ public enum SQLExceptionCode {
CANNOT_ALTER_TO_BE_TXN_IF_TXNS_DISABLED(1079, "44A10", "Cannot alter table to be transactional table if transactions are disabled"),
CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP(1080, "44A11", "Cannot create a transactional table if transactions are disabled"),
CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP(1081, "44A12", "Cannot alter table to be transactional table if transactions are disabled"),
+ TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set transaction context if transactions are disabled"),
+ TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush if transactions are disabled"),
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index eb2c77b..0ecce7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -124,6 +124,7 @@ public class MutationState implements SQLCloseable {
private long sizeOffset;
private int numRows = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
+ private boolean isExternalTxContext = false;
private final MutationMetricQueue mutationMetricQueue;
private ReadMetricQueue readMetricQueue;
@@ -170,6 +171,7 @@ public class MutationState implements SQLCloseable {
.getQueryServices().getTransactionSystemClient();
this.txContext = new TransactionContext(txServiceClient);
} else {
+ isExternalTxContext = true;
this.txContext = txContext;
}
} else {
@@ -224,7 +226,12 @@ public class MutationState implements SQLCloseable {
boolean hasUncommittedData = false;
for (TableRef source : sources) {
String sourcePhysicalName = source.getTable().getPhysicalName().getString();
- if (source.getTable().isTransactional() && uncommittedPhysicalNames.contains(sourcePhysicalName)) {
+ // Tracking uncommitted physical table names is an optimization that prevents us from
+ // having to do a checkpoint if no data has yet been written. If we're using an
+ // external transaction context, it's possible that data was already written at the
+ // current transaction timestamp, so we always checkpoint in that case is we're
+ // reading and writing to the same table.
+ if (source.getTable().isTransactional() && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) {
hasUncommittedData = true;
break;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 68312c6..d301321 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -106,6 +106,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
+import co.cask.tephra.TransactionContext;
+
/**
*
@@ -124,11 +126,12 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
private final String url;
private final ConnectionQueryServices services;
private final Properties info;
- private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
private final Map<PDataType<?>, Format> formatters = new HashMap<>();
- private final MutationState mutationState;
private final int mutateBatchSize;
private final Long scn;
+ private MutationState mutationState;
+ private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
+ private boolean isAutoFlush = false;
private boolean isAutoCommit = false;
private PMetaData metaData;
private final PName tenantId;
@@ -158,6 +161,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade) throws SQLException {
this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade);
this.isAutoCommit = connection.isAutoCommit;
+ this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
this.statementExecutionCounter = connection.statementExecutionCounter;
}
@@ -177,6 +181,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade());
this.isAutoCommit = connection.isAutoCommit;
+ this.isAutoFlush = connection.isAutoFlush;
this.sampler = connection.sampler;
this.statementExecutionCounter = connection.statementExecutionCounter;
}
@@ -216,6 +221,8 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
checkScn(scnParam);
this.scn = scnParam;
+ this.isAutoFlush = this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)
+ && this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_FLUSH) ;
this.isAutoCommit = JDBCUtil.getAutoCommit(
url, this.info,
this.services.getProps().getBoolean(
@@ -276,17 +283,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
private static Properties filterKnownNonProperties(Properties info) {
Properties prunedProperties = info;
- if (info.contains(PhoenixRuntime.CURRENT_SCN_ATTRIB)) {
- if (prunedProperties == info) {
- prunedProperties = PropertiesUtil.deepCopy(info);
- }
- prunedProperties.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
- }
- if (info.contains(PhoenixRuntime.TENANT_ID_ATTRIB)) {
- if (prunedProperties == info) {
- prunedProperties = PropertiesUtil.deepCopy(info);
+ for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) {
+ if (info.contains(property)) {
+ if (prunedProperties == info) {
+ prunedProperties = PropertiesUtil.deepCopy(info);
+ }
+ prunedProperties.remove(property);
}
- prunedProperties.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
}
return prunedProperties;
}
@@ -573,6 +576,35 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
return isAutoCommit;
}
+ public boolean getAutoFlush() {
+ return isAutoFlush;
+ }
+
+ public void setAutoFlush(boolean autoFlush) throws SQLException {
+ if (autoFlush && !this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH)
+ .build().buildException();
+ }
+ this.isAutoFlush = autoFlush;
+ }
+
+ public void flush() throws SQLException {
+ mutationState.sendUncommitted();
+ }
+
+ public void setTransactionContext(TransactionContext txContext) throws SQLException {
+ if (!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT)
+ .build().buildException();
+ }
+ this.mutationState.rollback();
+ this.mutationState = new MutationState(this.mutationState.getMaxSize(), this, txContext);
+
+ // Write data to HBase after each statement execution as the commit may not
+ // come through Phoenix APIs.
+ setAutoFlush(true);
+ }
+
@Override
public String getCatalog() throws SQLException {
return "";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 76836d5..86a139b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1165,6 +1165,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
PhoenixPreparedStatement statement = batch.get(i);
returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
}
+ // Flush all changes in batch if auto flush is true
+ flushIfNecessary();
// If we make it all the way through, clear the batch
clearBatch();
return returnCodes;
@@ -1269,9 +1271,17 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
.build().buildException();
}
- return executeMutation(stmt);
+ int updateCount = executeMutation(stmt);
+ flushIfNecessary();
+ return updateCount;
}
+ private void flushIfNecessary() throws SQLException {
+ if (connection.getAutoFlush()) {
+ connection.flush();
+ }
+ }
+
@Override
public boolean execute(String sql) throws SQLException {
CompilableStatement stmt = parseStatement(sql);
@@ -1281,6 +1291,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
.build().buildException();
}
executeMutation(stmt);
+ flushIfNecessary();
return false;
}
executeQuery(stmt);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index c105e9d..7fc6e0f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -84,7 +84,7 @@ public interface QueryServices extends SQLCloseable {
public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
- // Deprecated. Use FORCE_ROW_KEY_ORDER instead.
+ @Deprecated // Use FORCE_ROW_KEY_ORDER instead.
public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable";
public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes";
@@ -162,9 +162,14 @@ public interface QueryServices extends SQLCloseable {
public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = "phoenix.table.istransactional.default";
- public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled";
public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
+ // Transaction related configs
+ public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled";
+ // Controls whether or not uncommitted data is automatically sent to HBase
+ // at the end of a statement execution when transaction state is passed through.
+ public static final String AUTO_FLUSH_ATTRIB = "phoenix.transactions.autoFlush";
+
// rpc queue configs
public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index e8f905a..605e987 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -71,8 +71,8 @@ import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB
import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
import java.util.HashSet;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
@@ -197,11 +197,13 @@ public class QueryServicesOptions {
// TODO Change this to true as part of PHOENIX-1543
// We'll also need this for transactions to work correctly
public static final boolean DEFAULT_AUTO_COMMIT = false;
- public static final boolean DEFAULT_TRANSACTIONAL = false;
public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false;
public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false;
public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
+ public static final boolean DEFAULT_TRANSACTIONAL = false;
+ public static final boolean DEFAULT_AUTO_FLUSH = false;
+
private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 4ed6913..4c65a46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -98,14 +98,6 @@ import com.google.common.collect.Lists;
*/
public class PhoenixRuntime {
/**
- * Use this connection property to control HBase timestamps
- * by specifying your own long timestamp value at connection time. All
- * queries will use this as the upper bound of the time range for scans
- * and DDL, and DML will use this as t he timestamp for key values.
- */
- public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
-
- /**
* Root for the JDBC URL that the Phoenix accepts accepts.
*/
public final static String JDBC_PROTOCOL = "jdbc:phoenix";
@@ -121,13 +113,12 @@ public class PhoenixRuntime {
public final static String EMBEDDED_JDBC_PROTOCOL = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
/**
- * Use this connection property to control the number of rows that are
- * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
- * It's only used when autoCommit is true and your source table is
- * different than your target table or your SELECT statement has a
- * GROUP BY clause.
+ * Use this connection property to control HBase timestamps
+ * by specifying your own long timestamp value at connection time. All
+ * queries will use this as the upper bound of the time range for scans
+ * and DDL, and DML will use this as t he timestamp for key values.
*/
- public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
+ public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
/**
* Use this connection property to help with fairness of resource allocation
@@ -139,11 +130,13 @@ public class PhoenixRuntime {
public static final String TENANT_ID_ATTRIB = "TenantId";
/**
- * Use this connection property prefix for annotations that you want to show up in traces and log lines emitted by Phoenix.
- * This is useful for annotating connections with information available on the client (e.g. user or session identifier) and
- * having these annotation automatically passed into log lines and traces by Phoenix.
+ * Use this connection property to control the number of rows that are
+ * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
+ * It's only used when autoCommit is true and your source table is
+ * different than your target table or your SELECT statement has a
+ * GROUP BY clause.
*/
- public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation.";
+ public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
/**
* Use this connection property to explicitly enable or disable auto-commit on a new connection.
@@ -151,6 +144,32 @@ public class PhoenixRuntime {
public static final String AUTO_COMMIT_ATTRIB = "AutoCommit";
/**
+<<<<<<< HEAD
+=======
+ * Use this connection property to explicitly set read consistency level on a new connection.
+ */
+ public static final String CONSISTENCY_ATTRIB = "Consistency";
+
+ /**
+ * Use this connection property to explicitly enable or disable request level metric collection.
+ */
+ public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
+
+ /**
+ * All Phoenix specific connection properties
+ * TODO: use enum instead
+ */
+ public final static String[] CONNECTION_PROPERTIES = {
+ CURRENT_SCN_ATTRIB,
+ TENANT_ID_ATTRIB,
+ UPSERT_BATCH_SIZE_ATTRIB,
+ AUTO_COMMIT_ATTRIB,
+ CONSISTENCY_ATTRIB,
+ REQUEST_METRIC_ATTRIB
+ };
+
+ /**
+>>>>>>> 5180ae0... PHOENIX-2411 Allow Phoenix to participate as transactional component
* Use this as the zookeeper quorum name to have a connection-less connection. This enables
* Phoenix-compatible HFiles to be created in a map/reduce job by creating tables,
* upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)}
@@ -158,9 +177,11 @@ public class PhoenixRuntime {
public final static String CONNECTIONLESS = "none";
/**
- * Use this connection property to explicitly enable or disable request level metric collection.
+ * Use this connection property prefix for annotations that you want to show up in traces and log lines emitted by Phoenix.
+ * This is useful for annotating connections with information available on the client (e.g. user or session identifier) and
+ * having these annotation automatically passed into log lines and traces by Phoenix.
*/
- public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
+ public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation.";
private static final String HEADER_IN_LINE = "in-line";
private static final String SQL_FILE_EXT = ".sql";
[4/7] phoenix git commit: PHOENIX-2549 Modify Phoenix
DatabaseMetaData implementation for transaction support
Posted by ja...@apache.org.
PHOENIX-2549 Modify Phoenix DatabaseMetaData implementation for transaction support
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8b3ca756
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8b3ca756
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8b3ca756
Branch: refs/heads/4.x-HBase-0.98
Commit: 8b3ca756ca6a0008f9710ff69d1c1bce430d6080
Parents: 3172ed4
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Dec 30 10:43:30 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800
----------------------------------------------------------------------
.../phoenix/exception/SQLExceptionCode.java | 1 +
.../apache/phoenix/jdbc/PhoenixConnection.java | 12 ++++--
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 44 +++++---------------
.../apache/phoenix/jdbc/PhoenixDriverTest.java | 27 ++++++++++++
4 files changed, 48 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 36036ae..228f06f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -282,6 +282,7 @@ public enum SQLExceptionCode {
CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP(1081, "44A12", "Cannot alter table to be transactional table if transactions are disabled"),
TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set transaction context if transactions are disabled"),
TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush if transactions are disabled"),
+ TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL(1084, "44A15", "Cannot set isolation level to TRANSACTION_READ_COMMITTED or TRANSACTION_SERIALIZABLE if transactions are disabled"),
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 23d3235..04016c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -633,7 +633,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated {
@Override
public int getTransactionIsolation() throws SQLException {
- return Connection.TRANSACTION_READ_COMMITTED;
+ boolean transactionsEnabled = getQueryServices().getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED,
+ QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
+ return transactionsEnabled ?
+ Connection.TRANSACTION_SERIALIZABLE : Connection.TRANSACTION_READ_COMMITTED;
}
@Override
@@ -793,8 +796,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated {
@Override
public void setTransactionIsolation(int level) throws SQLException {
- if (level != Connection.TRANSACTION_READ_COMMITTED) {
- throw new SQLFeatureNotSupportedException();
+ boolean transactionsEnabled = getQueryServices().getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED,
+ QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
+ if (!transactionsEnabled && (level == Connection.TRANSACTION_REPEATABLE_READ || level == Connection.TRANSACTION_SERIALIZABLE)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL)
+ .build().buildException();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 87c02d0..a4748b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -73,30 +73,8 @@ import com.google.common.collect.Lists;
/**
*
- * JDBC DatabaseMetaData implementation of Phoenix reflecting read-only nature of driver.
- * Supported metadata methods include:
- * {@link #getTables(String, String, String, String[])}
- * {@link #getColumns(String, String, String, String)}
- * {@link #getTableTypes()}
- * {@link #getPrimaryKeys(String, String, String)}
- * {@link #getIndexInfo(String, String, String, boolean, boolean)}
- * {@link #getSchemas()}
- * {@link #getSchemas(String, String)}
- * {@link #getDatabaseMajorVersion()}
- * {@link #getDatabaseMinorVersion()}
- * {@link #getClientInfoProperties()}
- * {@link #getConnection()}
- * {@link #getDatabaseProductName()}
- * {@link #getDatabaseProductVersion()}
- * {@link #getDefaultTransactionIsolation()}
- * {@link #getDriverName()}
- * {@link #getDriverVersion()}
- * {@link #getSuperTables(String, String, String)}
- * {@link #getCatalogs()}
- * Other ResultSet methods return an empty result set.
- *
- *
- * @since 0.1
+ * JDBC DatabaseMetaData implementation of Phoenix.
+ *
*/
public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final int INDEX_NAME_INDEX = 4; // Shared with FAMILY_NAME_INDEX
@@ -1156,32 +1134,32 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
@Override
public boolean othersDeletesAreVisible(int type) throws SQLException {
- return true;
+ return false;
}
@Override
public boolean othersInsertsAreVisible(int type) throws SQLException {
- return true;
+ return false;
}
@Override
public boolean othersUpdatesAreVisible(int type) throws SQLException {
- return true;
+ return false;
}
@Override
public boolean ownDeletesAreVisible(int type) throws SQLException {
- return false;
+ return true;
}
@Override
public boolean ownInsertsAreVisible(int type) throws SQLException {
- return false;
+ return true;
}
@Override
public boolean ownUpdatesAreVisible(int type) throws SQLException {
- return false;
+ return true;
}
@Override
@@ -1387,7 +1365,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
@Override
public boolean supportsMultipleTransactions() throws SQLException {
- return false;
+ return true;
}
@Override
@@ -1534,12 +1512,12 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
@Override
public boolean supportsTransactionIsolationLevel(int level) throws SQLException {
- return level == connection.getTransactionIsolation();
+ return true;
}
@Override
public boolean supportsTransactions() throws SQLException {
- return false;
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
index 61dc442..b70ea71 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
@@ -87,4 +87,31 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
assertEquals(SQLExceptionCode.INVALID_SCN.getErrorCode(), e.getErrorCode());
}
}
+
+ @Test
+ public void testDisallowIsolationLevel() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.setTransactionIsolation(Connection.TRANSACTION_NONE);
+ conn.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+ conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ try {
+ conn = DriverManager.getConnection(getUrl());
+ conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL.getErrorCode(), e.getErrorCode());
+ }
+ try {
+ conn = DriverManager.getConnection(getUrl());
+ conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ fail();
+ } catch(SQLException e) {
+ assertEquals(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL.getErrorCode(), e.getErrorCode());
+ }
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
+ conn = DriverManager.getConnection(getUrl(), props);
+ conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+ conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ }
}