You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/12/31 02:53:26 UTC

[1/7] phoenix git commit: PHOENIX-2545 Abort transaction if send fails during commit

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 0f7dca652 -> 8b3ca756c


PHOENIX-2545 Abort transaction if send fails during commit


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7e4d39e6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7e4d39e6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7e4d39e6

Branch: refs/heads/4.x-HBase-0.98
Commit: 7e4d39e637d1ba765abed9e1d6ad73295575d2b9
Parents: 0f7dca6
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Dec 26 23:47:31 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:38:08 2015 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/CreateTableIT.java   |  28 +++
 .../apache/phoenix/end2end/UpsertSelectIT.java  |   2 +-
 .../apache/phoenix/execute/PartialCommitIT.java |  44 ++--
 .../coprocessor/MetaDataEndpointImpl.java       |  13 +-
 .../apache/phoenix/execute/MutationState.java   | 216 ++++++++++++-------
 .../index/PhoenixTransactionalIndexer.java      |  20 +-
 6 files changed, 207 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 7c4576c..5ffc354 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Test;
@@ -407,4 +408,31 @@ public class CreateTableIT extends BaseClientManagedTimeIT {
             assertEquals(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_TTL.getErrorCode(),sqle.getErrorCode());
         }
     }
+    
+    @Test
+    public void testAlterDeletedTable() throws Exception {
+        String ddl = "create table T ("
+                + " K varchar primary key,"
+                + " V1 varchar)";
+        long ts = nextTimestamp();
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute(ddl);
+        conn.close();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50));
+        Connection connAt50 = DriverManager.getConnection(getUrl(), props);
+        connAt50.createStatement().execute("DROP TABLE T");
+        connAt50.close();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+20));
+        Connection connAt20 = DriverManager.getConnection(getUrl(), props);
+        connAt20.createStatement().execute("UPDATE STATISTICS T"); // Invalidates from cache
+        try {
+            connAt20.createStatement().execute("ALTER TABLE T ADD V2 VARCHAR");
+            fail();
+        } catch (NewerTableAlreadyExistsException e) {
+            
+        }
+        connAt20.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 689562a..b5252e0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1109,7 +1109,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
         }
 
         // upsert data into base table without specifying the row timestamp column PK2
-        long upsertedTs = 5;
+        long upsertedTs = nextTimestamp();
         try (Connection conn = getConnection(upsertedTs)) {
             // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
             // creating a new row whose timestamp is the SCN of the connection. The same SCN will be used

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 61aae62..8d7ebcb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -100,7 +100,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         return Arrays.asList(false, true);
     }
     
+    private final boolean transactional;
+    
     public PartialCommitIT(boolean transactional) {
+        this.transactional = transactional;
 		if (transactional) {
 			A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN";
 			B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN";
@@ -148,8 +151,8 @@ public class PartialCommitIT extends BaseOwnClusterIT {
     
     @Test
     public void testNoFailure() {
-        testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), 0, new int[0], false,
-                                        singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"), singletonList(new Integer(1)));
+        testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"),
+                                        singletonList(new Integer(1)));
     }
     
     @Test
@@ -157,10 +160,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure1', 'a')", 
                                        UPSERT_TO_FAIL, 
                                        "upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure2', 'b')"), 
-                                       1, new int[]{1}, true,
+                                       transactional ? new int[] {0,1,2} : new int[]{1}, true,
                                        newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testUpsertFailure_'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), 
-                                       newArrayList(new Integer(2), new Integer(0)));
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+                                       transactional ? newArrayList(new Integer(0), new Integer(0)) : newArrayList(new Integer(2), new Integer(0)));
     }
     
     @Test
@@ -172,10 +175,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         
         testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertSelectFailure', 'a')", 
                                        UPSERT_SELECT_TO_FAIL), 
-                                       1, new int[]{1}, true,
+                                       transactional ? new int[] {0,1} : new int[]{1}, true, 
                                        newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), 
-                                       newArrayList(new Integer(2), new Integer(0)));
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+                                       transactional ? newArrayList(new Integer(1) /* from commit above */, new Integer(0)) : newArrayList(new Integer(2), new Integer(0)));
     }
     
     @Test
@@ -183,10 +186,10 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure1', 'a')", 
                                        DELETE_TO_FAIL,
                                        "upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure2', 'b')"), 
-                                       1, new int[]{1}, true,
+                                       transactional ? new int[] {0,1,2} : new int[]{1}, true, 
                                        newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testDeleteFailure_'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), 
-                                       newArrayList(new Integer(2), new Integer(1)));
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
+                                       transactional ? newArrayList(new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(2), new Integer(1)));
     }
     
     /**
@@ -197,11 +200,11 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
                                        UPSERT_TO_FAIL, 
                                        "upsert into " + A_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
-                                       2, new int[]{0,1}, true,
+                                       transactional ? new int[] {0,1,2} : new int[]{0,1}, true, 
                                        newArrayList("select count(*) from " + C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
                                                     "select count(*) from " + A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), 
-                                       newArrayList(new Integer(0), new Integer(1), new Integer(0)));
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+                                       transactional ? newArrayList(new Integer(0), new Integer(0), new Integer(0)) : newArrayList(new Integer(0), new Integer(1), new Integer(0)));
     }
     
     @Test
@@ -211,15 +214,15 @@ public class PartialCommitIT extends BaseOwnClusterIT {
                                        DELETE_TO_FAIL,
                                        "select * from " + A_SUCESS_TABLE + "", 
                                        UPSERT_TO_FAIL), 
-                                       2, new int[]{2,4}, true,
+                                       transactional ? new int[] {0,1,2,4} : new int[]{2,4}, true, 
                                        newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
                                                     "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), 
-                                       newArrayList(new Integer(4), new Integer(0), new Integer(1)));
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
+                                       transactional ? newArrayList(new Integer(3) /* original rows */, new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(4), new Integer(0), new Integer(1)));
     }
     
-    private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail,
-                                   List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) {
+    private void testPartialCommit(List<String> statements, int[] expectedUncommittedStatementIndexes, boolean willFail, List<String> countStatementsForVerification,
+                                   List<Integer> expectedCountsForVerification) {
         Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size());
         
         try (Connection con = getConnectionWithTableOrderPreservingMutationState()) {
@@ -241,7 +244,6 @@ public class PartialCommitIT extends BaseOwnClusterIT {
                 }
                 assertEquals(CommitException.class, sqle.getClass());
                 int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
-                assertEquals(failureCount, uncommittedStatementIndexes.length);
                 assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
             }
             
@@ -267,7 +269,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         return new PhoenixConnection(phxCon, null) {
             @Override
             protected MutationState newMutationState(int maxSize) {
-                return new MutationState(maxSize, this, mutations, null);
+                return new MutationState(maxSize, this, mutations, null, null);
             };
         };
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 1280baa..2e931fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -977,13 +977,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         scan.setFilter(new FirstKeyOnlyFilter());
         scan.setRaw(true);
         List<Cell> results = Lists.<Cell> newArrayList();
-        try (RegionScanner scanner = region.getScanner(scan);) {
+        try (RegionScanner scanner = region.getScanner(scan)) {
           scanner.next(results);
         }
-        // HBase ignores the time range on a raw scan (HBASE-7362)
-        if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
-            Cell kv = results.get(0);
-            if (kv.getTypeByte() == Type.Delete.getCode()) {
+        for (Cell kv : results) {
+            KeyValue.Type type = Type.codeToType(kv.getTypeByte());
+            if (type == Type.DeleteFamily) { // Row was deleted
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                         GlobalCache.getInstance(this.env).getMetaDataCache();
                 PTable table = newDeletedTableMarker(kv.getTimestamp());
@@ -1621,7 +1620,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
                     // if not found then call newerTableExists and add delete marker for timestamp
                     // found
-                    if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
+                    table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
+                    if (table != null) {
+                        logger.info("Found newer table deleted as of " + table.getTimeStamp());
                         return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                                 EnvironmentEdgeManager.currentTimeMillis(), null);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 195dce3..eb2c77b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -68,6 +69,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PLong;
@@ -109,8 +111,9 @@ import co.cask.tephra.hbase98.TransactionAwareHTable;
 public class MutationState implements SQLCloseable {
     private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
     private static final TransactionCodec CODEC = new TransactionCodec();
+    private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
     
-    private PhoenixConnection connection;
+    private final PhoenixConnection connection;
     private final long maxSize;
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
     private final List<TransactionAware> txAwares;
@@ -120,35 +123,39 @@ public class MutationState implements SQLCloseable {
     private Transaction tx;
     private long sizeOffset;
     private int numRows = 0;
-    private boolean txStarted = false;
+    private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
     
     public MutationState(long maxSize, PhoenixConnection connection) {
-        this(maxSize,connection, null);
+        this(maxSize,connection, null, null);
+    }
+    
+    public MutationState(long maxSize, PhoenixConnection connection, TransactionContext txContext) {
+        this(maxSize,connection, null, txContext);
     }
     
     public MutationState(MutationState mutationState) {
-        this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction());
+        this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction(), null);
     }
     
     public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
-        this(maxSize, connection, null, sizeOffset);
+        this(maxSize, connection, null, null, sizeOffset);
     }
     
-    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx) {
-        this(maxSize,connection, tx, 0);
+    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext) {
+        this(maxSize,connection, tx, txContext, 0);
     }
     
-    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, long sizeOffset) {
-    	this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx);
+    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) {
+    	this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx, txContext);
         this.sizeOffset = sizeOffset;
     }
     
 	MutationState(long maxSize, PhoenixConnection connection,
 			Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
-			Transaction tx) {
+			Transaction tx, TransactionContext txContext) {
 		this.maxSize = maxSize;
 		this.connection = connection;
 		this.mutations = mutations;
@@ -157,26 +164,34 @@ public class MutationState implements SQLCloseable {
 				: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
 		this.tx = tx;
 		if (tx == null) {
-			this.txAwares = Collections.emptyList();
-			TransactionSystemClient txServiceClient = this.connection
-					.getQueryServices().getTransactionSystemClient();
-			this.txContext = new TransactionContext(txServiceClient);
+            this.txAwares = Collections.emptyList();
+		    if (txContext == null) {
+    			TransactionSystemClient txServiceClient = this.connection
+    					.getQueryServices().getTransactionSystemClient();
+    			this.txContext = new TransactionContext(txServiceClient);
+		    } else {
+		        this.txContext = txContext;
+		    }
 		} else {
 			// this code path is only used while running child scans, we can't pass the txContext to child scans
 			// as it is not thread safe, so we use the tx member variable
-			txAwares = Lists.newArrayList();
-			txContext = null;
+			this.txAwares = Lists.newArrayList();
+			this.txContext = null;
 		}
 	}
 
     public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
-        this(maxSize, connection, null, sizeOffset);
+        this(maxSize, connection, null, null, sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
         this.tx = connection.getMutationState().getTransaction();
         throwIfTooBig();
     }
     
+    public long getMaxSize() {
+        return maxSize;
+    }
+    
     public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException {
         Transaction currentTx = getTransaction();
         if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) {
@@ -308,9 +323,8 @@ public class MutationState implements SQLCloseable {
 		}
         
         try {
-            if (!txStarted) {
+            if (!isTransactionStarted()) {
                 txContext.start();
-                txStarted = true;
                 return true;
             }
         } catch (TransactionFailureException e) {
@@ -320,7 +334,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null);
+        MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null);
         state.sizeOffset = 0;
         return state;
     }
@@ -599,18 +613,24 @@ public class MutationState implements SQLCloseable {
 	    Long scn = connection.getSCN();
 	    MetaDataClient client = new MetaDataClient(connection);
 	    long serverTimeStamp = tableRef.getTimeStamp();
-	    PTable table = tableRef.getTable();
 	    // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
 	    // so no need to do it again here.
 	    if (!connection.getAutoCommit()) {
+	        PTable table = tableRef.getTable();
             MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+            PTable resolvedTable = result.getTable();
+            if (resolvedTable == null) {
+                throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
+            }
+            // Always update tableRef table as the one we've cached may be out of date since when we executed
+            // the UPSERT VALUES call and updated in the cache before this.
+            tableRef.setTable(resolvedTable);
             long timestamp = result.getMutationTime();
             if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
                 serverTimeStamp = timestamp;
                 if (result.wasUpdated()) {
                     // TODO: use bitset?
-                    table = result.getTable();
-                    PColumn[] columns = new PColumn[table.getColumns().size()];
+                    PColumn[] columns = new PColumn[resolvedTable.getColumns().size()];
                     for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
                     	RowMutationState valueEntry = rowEntry.getValue();
                         if (valueEntry != null) {
@@ -624,10 +644,9 @@ public class MutationState implements SQLCloseable {
                     }
                     for (PColumn column : columns) {
                         if (column != null) {
-                            table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
+                            resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
                         }
                     }
-                    tableRef.setTable(table);
                 }
             }
         }
@@ -731,25 +750,28 @@ public class MutationState implements SQLCloseable {
             sendAll = true;
         }
 
+        List<TableRef> txTableRefs = Lists.newArrayListWithExpectedSize(mutations.size());
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
             Span span = trace.getSpan();
 	        ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+	        boolean isTransactional;
 	        while (tableRefIterator.hasNext()) {
 	        	// at this point we are going through mutations for each table
-	            TableRef tableRef = tableRefIterator.next();
+	            final TableRef tableRef = tableRefIterator.next();
 	            Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef);
 	            if (valuesMap == null || valuesMap.isEmpty()) {
 	                continue;
 	            }
-	            PTable table = tableRef.getTable();
+                // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
+                long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
+	            final PTable table = tableRef.getTable();
 	            // Track tables to which we've sent uncommitted data
-	            if (table.isTransactional()) {
+	            if (isTransactional = table.isTransactional()) {
+                    txTableRefs.add(tableRef);
 	                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
 	            }
 	            boolean isDataTable = true;
-	            // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
-	            long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
                 table.getIndexMaintainers(indexMetaDataPtr, connection);
 	            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
 	            while (mutationsIterator.hasNext()) {
@@ -776,7 +798,7 @@ public class MutationState implements SQLCloseable {
 	                    SQLException sqlE = null;
 	                    HTableInterface hTable = connection.getQueryServices().getTable(htableName);
 	                    try {
-	                        if (table.isTransactional()) {
+	                        if (isTransactional) {
 	                            // If we have indexes, wrap the HTable in a delegate HTable that
 	                            // will attach the necessary index meta data in the event of a
 	                            // rollback
@@ -830,8 +852,8 @@ public class MutationState implements SQLCloseable {
 	                            }
 	                            e = inferredE;
 	                        }
-	                        // Throw to client with both what was committed so far and what is left to be committed.
-	                        // That way, client can either undo what was done or try again with what was not done.
+	                        // Throw to client an exception that indicates the statements that
+	                        // were not committed successfully.
 	                        sqlE = new CommitException(e, getUncommittedStatementIndexes());
 	                    } finally {
 	                        try {
@@ -862,17 +884,24 @@ public class MutationState implements SQLCloseable {
 	            if (tableRef.getTable().getType() != PTableType.INDEX) {
 	                numRows -= valuesMap.size();
 	            }
-	            // Remove batches as we process them
+	            // For transactions, track the statement indexes as we send data
+	            // over because our CommitException should include all statements
+	            // involved in the transaction since none of them would have been
+	            // committed in the event of a failure.
+	            if (isTransactional) {
+	                addUncommittedStatementIndexes(valuesMap.values());
+	            }
+                // Remove batches as we process them
 	            if (sendAll) {
-	            	tableRefIterator.remove(); // Iterating through actual map in this case
+	                // Iterating through map key set in this case, so we cannot use
+	                // the remove method without getting a concurrent modification
+	                // exception.
+	            	tableRefIterator.remove();
 	            } else {
 	            	mutations.remove(tableRef);
 	            }
 	        }
         }
-        // Note that we cannot assume that *all* mutations have been sent, since we've optimized this
-        // now to only send the mutations for the tables we're querying, hence we've removed the
-        // assertions that we're here before.
     }
 
     public byte[] encodeTransaction() throws SQLException {
@@ -930,19 +959,17 @@ public class MutationState implements SQLCloseable {
         return cache;
     }
     
-    private void clear() throws SQLException {
-        this.mutations.clear();
-        numRows = 0;
+    private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations) {
+        for (RowMutationState rowMutationState : rowMutations) {
+            uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, rowMutationState.getStatementIndexes());
+        }
     }
     
     private int[] getUncommittedStatementIndexes() {
-    	int[] result = new int[0];
-    	for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
-    		for (RowMutationState rowMutationState : rowMutations.values()) {
-    			result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes());
-    		}
+    	for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) {
+    	    addUncommittedStatementIndexes(rowMutationMap.values());
     	}
-    	return result;
+    	return uncommittedStatementIndexes;
     }
     
     @Override
@@ -950,65 +977,96 @@ public class MutationState implements SQLCloseable {
     }
 
     private void reset() {
-        txStarted = false;
         tx = null;
+        txAwares.clear();
         uncommittedPhysicalNames.clear();
+        this.mutations.clear();
+        numRows = 0;
+        uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
     
     public void rollback() throws SQLException {
-        clear();
-        txAwares.clear();
-        if (txContext != null) {
-            try {
-                if (txStarted) {
+        try {
+            if (txContext != null && isTransactionStarted()) {
+                try {
                     txContext.abort();
+                } catch (TransactionFailureException e) {
+                    throw TransactionUtil.getTransactionFailureException(e);
                 }
-            } catch (TransactionFailureException e) {
-                throw new SQLException(e); // TODO: error code
-            } finally {
-            	reset();
             }
+        } finally {
+            reset();
         }
     }
     
     public void commit() throws SQLException {
-    	boolean sendMutationsFailed=false;
+    	boolean sendSuccessful=false;
+    	SQLException sqlE = null;
         try {
             send();
-        } catch (Throwable t) {
-        	sendMutationsFailed=true;
-        	throw t;
+            sendSuccessful=true;
+        } catch (SQLException e) {
+            sqlE = e;
         } finally {
-            txAwares.clear();
-            if (txContext != null) {
-                try {
-                    if (txStarted && !sendMutationsFailed) {
-                        txContext.finish();
-                    }
-                } catch (TransactionFailureException e) {
+            try {
+                if (txContext != null && isTransactionStarted()) {
+                    TransactionFailureException txFailure = null;
+                    boolean finishSuccessful=false;
                     try {
-                        txContext.abort(e);
-                        // abort and throw the original commit failure exception
-                        throw TransactionUtil.getTransactionFailureException(e);
-                    } catch (TransactionFailureException e1) {
-                        // if abort fails and throw the abort failure exception
-                        throw TransactionUtil.getTransactionFailureException(e1);
+                        if (sendSuccessful) {
+                            txContext.finish();
+                            finishSuccessful = true;
+                        }
+                    } catch (TransactionFailureException e) {
+                        txFailure = e;
+                        SQLException nextE = TransactionUtil.getTransactionFailureException(e);
+                        if (sqlE == null) {
+                            sqlE = nextE;
+                        } else {
+                            sqlE.setNextException(nextE);
+                        }
+                    } finally {
+                        // If send fails or finish fails, abort the tx
+                        if (!finishSuccessful) {
+                            try {
+                                txContext.abort(txFailure);
+                            } catch (TransactionFailureException e) {
+                                SQLException nextE = TransactionUtil.getTransactionFailureException(e);
+                                if (sqlE == null) {
+                                    sqlE = nextE;
+                                } else {
+                                    sqlE.setNextException(nextE);
+                                }
+                            }
+                        }
                     }
+                }
+            } finally {
+                try {
+                    reset();
                 } finally {
-                  	if (!sendMutationsFailed) {
-                  		reset();
-                  	}
-                  }
+                    if (sqlE != null) {
+                        throw sqlE;
+                    }
+                }
             }
         }
     }
 
     /**
+     * Send to HBase any uncommitted data for transactional tables.
+     * @return true if any data was sent and false otherwise.
+     * @throws SQLException
+     */
+    public boolean sendUncommitted() throws SQLException {
+        return sendUncommitted(mutations.keySet().iterator());
+    }
+    /**
      * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a
      * query. In this way, they are visible to subsequent reads but are not actually committed until
      * commit is called.
      * @param tableRefs
-     * @return true if at least partially transactional and false otherwise.
+     * @return true if any data was sent and false otherwise.
      * @throws SQLException
      */
     public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e4d39e6/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 75ad63d..0e00cc6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -49,9 +49,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.cloudera.htrace.Span;
-import org.cloudera.htrace.Trace;
-import org.cloudera.htrace.TraceScope;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.hbase.index.MultiMutation;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -70,17 +67,20 @@ import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Longs;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
 /**
  * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction
  * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a
@@ -196,7 +196,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         // run a scan to get the current state. We'll need this to delete
         // the existing index rows.
         Transaction tx = indexMetaData.getTransaction();
-        assert(tx != null);
+        if (tx == null) {
+            throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString());
+        }
         List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
         Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
         for (IndexMaintainer indexMaintainer : indexMaintainers) {


[7/7] phoenix git commit: PHOENIX-2546 Filters should take into account that multiple versions may be scanned

Posted by ja...@apache.org.
PHOENIX-2546 Filters should take into account that multiple versions may be scanned


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0baa1d0f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0baa1d0f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0baa1d0f

Branch: refs/heads/4.x-HBase-0.98
Commit: 0baa1d0fe9cad54ca249d9ca39303192fa5e5e59
Parents: 24d9c3d
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Dec 29 23:00:27 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/tx/TransactionIT.java    | 147 +++++++++++++++++++
 .../phoenix/filter/ColumnProjectionFilter.java  |   3 +-
 .../MultiCFCQKeyValueComparisonFilter.java      |   1 -
 .../filter/MultiCQKeyValueComparisonFilter.java |   2 -
 .../filter/MultiKeyValueComparisonFilter.java   |  69 ++++-----
 .../phoenix/filter/RowKeyComparisonFilter.java  |  11 +-
 .../SingleCFCQKeyValueComparisonFilter.java     |   3 -
 .../SingleCQKeyValueComparisonFilter.java       |   3 -
 .../filter/SingleKeyValueComparisonFilter.java  |  19 +--
 .../apache/phoenix/filter/SkipScanFilter.java   |   6 +-
 .../schema/tuple/SingleKeyValueTuple.java       |  46 +++---
 .../phoenix/filter/SkipScanFilterTest.java      |   2 +-
 12 files changed, 217 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 5a322bb..1aabb03 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -350,6 +351,7 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         assertFalse(rs.next());
     }
     
+    @Ignore
     @Test
     public void testNonTxToTxTableFailure() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
@@ -725,4 +727,149 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         Result result = htable.get(new Get(Bytes.toBytes("j")));
         assertTrue(result.isEmpty());
     }
+
+    @Test
+    public void testCheckpointAndRollback() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            String fullTableName = "T";
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true");
+            stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
+            conn.commit();
+            
+            stmt.executeUpdate("upsert into " + fullTableName + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName);
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("aa", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName);
+            
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("aaa", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Ignore("Add back once TEPHRA-162 gets fixed")
+    @Test
+    public void testInflightUpdateNotSeen() throws Exception {
+        String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+        try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            conn2.setAutoCommit(true);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            conn1.commit();
+            
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME + " WHERE int_col1 IS NULL");
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            
+            upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, int_col1) VALUES(?, ?, ?, ?, ?, ?, 1)";
+            stmt = conn1.prepareStatement(upsert);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            
+            rs = conn1.createStatement().executeQuery("SELECT int_col1 FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+            rs = conn2.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+            assertFalse(rs.next());
+            
+            conn1.commit();
+            
+            rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            rs = conn2.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " WHERE int_col1 = 1");
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Ignore("Add back once TEPHRA-162 gets fixed")
+    @Test
+    public void testInflightDeleteNotSeen() throws Exception {
+        String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+        try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            conn2.setAutoCommit(true);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            conn1.commit();
+            
+            rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            
+            String delete = "DELETE FROM " + FULL_TABLE_NAME + " WHERE varchar_pk = 'varchar1'";
+            stmt = conn1.prepareStatement(delete);
+            int count = stmt.executeUpdate();
+            assertEquals(1,count);
+            
+            rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            conn1.commit();
+            
+            rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index d074c56..b8b0350 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -177,6 +176,6 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
     
     @Override
     public ReturnCode filterKeyValue(Cell ignored) throws IOException {
-      return ReturnCode.INCLUDE;
+      return ReturnCode.INCLUDE_AND_NEXT_COL;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
index 9147f1a..3bd1fd7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCFCQKeyValueComparisonFilter.java
@@ -31,7 +31,6 @@ import org.apache.phoenix.expression.Expression;
  * are references to multiple column qualifiers over multiple column families.
  * Also there same qualifier names in different families.
  * 
- * @since 0.1
  */
 public class MultiCFCQKeyValueComparisonFilter extends MultiKeyValueComparisonFilter {
     private final ImmutablePairBytesPtr ptr = new ImmutablePairBytesPtr();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
index 5fa5035..91e4392 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
@@ -29,8 +29,6 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
  * Filter that evaluates WHERE clause expression, used in the case where there
  * are references to multiple unique column qualifiers over one or more column families.
  *
- * 
- * @since 0.1
  */
 public class MultiCQKeyValueComparisonFilter extends MultiKeyValueComparisonFilter {
     private ImmutableBytesPtr ptr = new ImmutableBytesPtr();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index 1cb2255..569faa5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -24,8 +24,6 @@ import java.util.Map;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.expression.Expression;
@@ -41,8 +39,6 @@ import org.apache.phoenix.schema.tuple.BaseTuple;
  * but for general expression evaluation in the case where multiple KeyValue
  * columns are referenced in the expression.
  *
- * 
- * @since 0.1
  */
 public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFilter {
     private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
@@ -59,14 +55,14 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
         init();
     }
 
-    private static final class KeyValueRef {
-        public KeyValue keyValue;
+    private static final class CellRef {
+        public Cell cell;
         
         @Override
         public String toString() {
-            if(keyValue != null) {
-                return keyValue.toString() + " value = " + Bytes.toStringBinary(
-                		keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength());
+            if(cell != null) {
+                return cell.toString() + " value = " + Bytes.toStringBinary(
+                		cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
             } else {
                 return super.toString();
             }
@@ -79,13 +75,13 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
     private final class IncrementalResultTuple extends BaseTuple {
         private int refCount;
         private final ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
-        private final Map<Object,KeyValueRef> foundColumns = new HashMap<Object,KeyValueRef>(5);
+        private final Map<Object,CellRef> foundColumns = new HashMap<Object,CellRef>(5);
         
         public void reset() {
             refCount = 0;
             keyPtr.set(UNITIALIZED_KEY_BUFFER);
-            for (KeyValueRef ref : foundColumns.values()) {
-                ref.keyValue = null;
+            for (CellRef ref : foundColumns.values()) {
+                ref.cell = null;
             }
         }
         
@@ -98,39 +94,39 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
             refCount = foundColumns.size();
         }
         
-        public ReturnCode resolveColumn(KeyValue value) {
+        public ReturnCode resolveColumn(Cell value) {
             // Always set key, in case we never find a key value column of interest,
             // and our expression uses row key columns.
             setKey(value);
             Object ptr = setColumnKey(value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), 
             		value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength());
-            KeyValueRef ref = foundColumns.get(ptr);
+            CellRef ref = foundColumns.get(ptr);
             if (ref == null) {
-                // Return INCLUDE here. Although this filter doesn't need this KV
+                // Return INCLUDE_AND_NEXT_COL here. Although this filter doesn't need this KV
                 // it should still be projected into the Result
-                return ReturnCode.INCLUDE;
+                return ReturnCode.INCLUDE_AND_NEXT_COL;
             }
             // Since we only look at the latest key value for a given column,
             // we are not interested in older versions
             // TODO: test with older versions to confirm this doesn't get tripped
             // This shouldn't be necessary, because a scan only looks at the latest
             // version
-            if (ref.keyValue != null) {
+            if (ref.cell != null) {
                 // Can't do NEXT_ROW, because then we don't match the other columns
                 // SKIP, INCLUDE, and NEXT_COL seem to all act the same
                 return ReturnCode.NEXT_COL;
             }
-            ref.keyValue = value;
+            ref.cell = value;
             refCount++;
             return null;
         }
         
         public void addColumn(byte[] cf, byte[] cq) {
             Object ptr = MultiKeyValueComparisonFilter.this.newColumnKey(cf, 0, cf.length, cq, 0, cq.length);
-            foundColumns.put(ptr, new KeyValueRef());
+            foundColumns.put(ptr, new CellRef());
         }
         
-        public void setKey(KeyValue value) {
+        public void setKey(Cell value) {
             keyPtr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength());
         }
         
@@ -140,10 +136,10 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
         }
         
         @Override
-        public KeyValue getValue(byte[] cf, byte[] cq) {
+        public Cell getValue(byte[] cf, byte[] cq) {
             Object ptr = setColumnKey(cf, 0, cf.length, cq, 0, cq.length);
-            KeyValueRef ref = foundColumns.get(ptr);
-            return ref == null ? null : ref.keyValue;
+            CellRef ref = foundColumns.get(ptr);
+            return ref == null ? null : ref.cell;
         }
         
         @Override
@@ -157,15 +153,15 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
         }
 
         @Override
-        public KeyValue getValue(int index) {
+        public Cell getValue(int index) {
             // This won't perform very well, but it's not
             // currently used anyway
-            for (KeyValueRef ref : foundColumns.values()) {
-                if (ref.keyValue == null) {
+            for (CellRef ref : foundColumns.values()) {
+                if (ref.cell == null) {
                     continue;
                 }
                 if (index == 0) {
-                    return ref.keyValue;
+                    return ref.cell;
                 }
                 index--;
             }
@@ -175,10 +171,10 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
         @Override
         public boolean getValue(byte[] family, byte[] qualifier,
                 ImmutableBytesWritable ptr) {
-            KeyValue kv = getValue(family, qualifier);
-            if (kv == null)
+            Cell cell = getValue(family, qualifier);
+            if (cell == null)
                 return false;
-            ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+            ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
             return true;
         }
     }
@@ -197,17 +193,17 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
     }
     
     @Override
-    public ReturnCode filterKeyValue(Cell keyValue) {
+    public ReturnCode filterKeyValue(Cell cell) {
         if (Boolean.TRUE.equals(this.matchedColumn)) {
           // We already found and matched the single column, all keys now pass
-          return ReturnCode.INCLUDE;
+          return ReturnCode.INCLUDE_AND_NEXT_COL;
         }
         if (Boolean.FALSE.equals(this.matchedColumn)) {
           // We found all the columns, but did not match the expression, so skip to next row
           return ReturnCode.NEXT_ROW;
         }
         // This is a key value we're not interested in (TODO: why INCLUDE here instead of NEXT_COL?)
-        ReturnCode code = inputTuple.resolveColumn(KeyValueUtil.ensureKeyValue(keyValue));
+        ReturnCode code = inputTuple.resolveColumn(cell);
         if (code != null) {
             return code;
         }
@@ -220,10 +216,10 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
             if (inputTuple.isImmutable()) {
                 this.matchedColumn = Boolean.FALSE;
             } else {
-                return ReturnCode.INCLUDE;
+                return ReturnCode.INCLUDE_AND_NEXT_COL;
             }
         }
-        return this.matchedColumn ? ReturnCode.INCLUDE : ReturnCode.NEXT_ROW;
+        return this.matchedColumn ? ReturnCode.INCLUDE_AND_NEXT_COL : ReturnCode.NEXT_ROW;
     }
 
     @Override
@@ -243,8 +239,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
         super.reset();
     }
 
-    @SuppressWarnings("all")
-    // suppressing missing @Override since this doesn't exist for HBase 0.94.4
+    @Override
     public boolean isFamilyEssential(byte[] name) {
         // Only the column families involved in the expression are essential.
         // The others are for columns projected in the select expression.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
index b7de7ac..2eb69bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/RowKeyComparisonFilter.java
@@ -22,7 +22,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -38,8 +37,6 @@ import org.slf4j.LoggerFactory;
  *
  * Filter for use when expressions only reference row key columns
  *
- * 
- * @since 0.1
  */
 public class RowKeyComparisonFilter extends BooleanExpressionFilter {
     private static final Logger logger = LoggerFactory.getLogger(RowKeyComparisonFilter.class);
@@ -79,7 +76,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
             }
             evaluate = false;
         }
-        return keepRow ? ReturnCode.INCLUDE : ReturnCode.NEXT_ROW;
+        return keepRow ? ReturnCode.INCLUDE_AND_NEXT_COL : ReturnCode.NEXT_ROW;
     }
 
     private final class RowKeyTuple extends BaseTuple {
@@ -99,7 +96,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
         }
 
         @Override
-        public KeyValue getValue(byte[] cf, byte[] cq) {
+        public Cell getValue(byte[] cf, byte[] cq) {
             return null;
         }
 
@@ -119,7 +116,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
         }
 
         @Override
-        public KeyValue getValue(int index) {
+        public Cell getValue(int index) {
             throw new IndexOutOfBoundsException(Integer.toString(index));
         }
 
@@ -135,7 +132,7 @@ public class RowKeyComparisonFilter extends BooleanExpressionFilter {
         return !this.keepRow;
     }
 
-    @SuppressWarnings("all") // suppressing missing @Override since this doesn't exist for HBase 0.94.4
+    @Override
     public boolean isFamilyEssential(byte[] name) {
         // We only need our "guaranteed to have a key value" column family,
         // which we pass in and serialize through. In the case of a VIEW where

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
index 963fe59..c63673c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCFCQKeyValueComparisonFilter.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
-
 import org.apache.phoenix.expression.Expression;
 
 
@@ -32,8 +31,6 @@ import org.apache.phoenix.expression.Expression;
  * column qualifier parts of the key value to disambiguate with another similarly
  * named column qualifier in a different column family.
  *
- * 
- * @since 0.1
  */
 public class SingleCFCQKeyValueComparisonFilter extends SingleKeyValueComparisonFilter {
     public SingleCFCQKeyValueComparisonFilter() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
index 240c8a6..0d904bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
-
 import org.apache.phoenix.expression.Expression;
 
 
@@ -32,8 +31,6 @@ import org.apache.phoenix.expression.Expression;
  * part of the key value since the column qualifier is unique across all column
  * families.
  *
- * 
- * @since 0.1
  */
 public class SingleCQKeyValueComparisonFilter extends SingleKeyValueComparisonFilter {
     public SingleCQKeyValueComparisonFilter() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index 8929f8a..eaf8d35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -21,7 +21,6 @@ import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -37,8 +36,6 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
  * but for general expression evaluation in the case where only a single KeyValue
  * column is referenced in the expression.
  *
- * 
- * @since 0.1
  */
 public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFilter {
     private final SingleKeyValueTuple inputTuple = new SingleKeyValueTuple();
@@ -76,8 +73,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
     public ReturnCode filterKeyValue(Cell keyValue) {
         if (this.matchedColumn) {
           // We already found and matched the single column, all keys now pass
-          // TODO: why won't this cause earlier versions of a kv to be included?
-          return ReturnCode.INCLUDE;
+          return ReturnCode.INCLUDE_AND_NEXT_COL;
         }
         if (this.foundColumn()) {
           // We found all the columns, but did not match the expression, so skip to next row
@@ -87,19 +83,18 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
                 keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength()) != 0) {
             // Remember the key in case this is the only key value we see.
             // We'll need it if we have row key columns too.
-            inputTuple.setKey(KeyValueUtil.ensureKeyValue(keyValue));
+            inputTuple.setKey(keyValue);
             // This is a key value we're not interested in
-            // TODO: use NEXT_COL when bug fix comes through that includes the row still
-            return ReturnCode.INCLUDE;
+            return ReturnCode.INCLUDE_AND_NEXT_COL;
         }
-        inputTuple.setKeyValue(KeyValueUtil.ensureKeyValue(keyValue));
+        inputTuple.setCell(keyValue);
 
         // We have the columns, so evaluate here
         if (!Boolean.TRUE.equals(evaluate(inputTuple))) {
             return ReturnCode.NEXT_ROW;
         }
         this.matchedColumn = true;
-        return ReturnCode.INCLUDE;
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
     }
 
     @Override
@@ -124,7 +119,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
         return true;
     }
 
-      @Override
+    @Override
     public void reset() {
         inputTuple.reset();
         matchedColumn = false;
@@ -137,7 +132,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
         init();
     }
 
-    @SuppressWarnings("all") // suppressing missing @Override since this doesn't exist for HBase 0.94.4
+    @Override
     public boolean isFamilyEssential(byte[] name) {
         // Only the column families involved in the expression are essential.
         // The others are for columns projected in the select expression

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
index 667b3d7..77b4cf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
@@ -274,7 +274,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
         // more than we need. We can optimize this by tracking whether each range in each slot position
         // intersects.
         ReturnCode endCode = navigate(upperExclusiveKey, 0, upperExclusiveKey.length, Terminate.AT);
-        if (endCode == ReturnCode.INCLUDE) {
+        if (endCode == ReturnCode.INCLUDE || endCode == ReturnCode.INCLUDE_AND_NEXT_COL) {
             setStartKey();
             // If the upperExclusiveKey is equal to the start key, we've gone one position too far, since
             // our upper key is exclusive. In that case, go to the previous key
@@ -358,7 +358,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
         // First check to see if we're in-range until we reach our end key
         if (endKeyLength > 0) {
             if (Bytes.compareTo(currentKey, offset, length, endKey, 0, endKeyLength) < 0) {
-                return ReturnCode.INCLUDE;
+                return ReturnCode.INCLUDE_AND_NEXT_COL;
             }
 
             // If key range of last slot is a single key, we can increment our position
@@ -490,7 +490,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
         // up to the upper range of our last slot. We do this for ranges and single keys
         // since we potentially have multiple key values for the same row key.
         setEndKey(ptr, minOffset, i);
-        return ReturnCode.INCLUDE;
+        return ReturnCode.INCLUDE_AND_NEXT_COL;
     }
 
     private boolean allTrailingNulls(int i) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
index 8226208..e35eb13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/SingleKeyValueTuple.java
@@ -17,24 +17,24 @@
  */
 package org.apache.phoenix.schema.tuple;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 
 
 public class SingleKeyValueTuple extends BaseTuple {
     private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
-    private KeyValue keyValue;
+    private Cell cell;
     private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
     
     public SingleKeyValueTuple() {
     }
     
-    public SingleKeyValueTuple(KeyValue keyValue) {
-        if (keyValue == null) {
+    public SingleKeyValueTuple(Cell cell) {
+        if (cell == null) {
             throw new NullPointerException();
         }
-        setKeyValue(keyValue);
+        setCell(cell);
     }
     
     public boolean hasKey() {
@@ -42,28 +42,27 @@ public class SingleKeyValueTuple extends BaseTuple {
     }
     
     public void reset() {
-        this.keyValue = null;
+        this.cell = null;
         keyPtr.set(UNITIALIZED_KEY_BUFFER);
     }
     
-    public void setKeyValue(KeyValue keyValue) {
-        if (keyValue == null) {
+    public void setCell(Cell cell) {
+        if (cell == null) {
             throw new IllegalArgumentException();
         }
-        this.keyValue = keyValue;
-        setKey(keyValue);
+        this.cell = cell;
+        setKey(cell);
     }
     
     public void setKey(ImmutableBytesWritable ptr) {
         keyPtr.set(ptr.get(), ptr.getOffset(), ptr.getLength());
     }
     
-    @SuppressWarnings("deprecation")
-    public void setKey(KeyValue keyValue) {
-        if (keyValue == null) {
+    public void setKey(Cell cell) {
+        if (cell == null) {
             throw new IllegalArgumentException();
         }
-        keyPtr.set(keyValue.getBuffer(), keyValue.getRowOffset(), keyValue.getRowLength());
+        keyPtr.set(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
     }
     
     @Override
@@ -72,8 +71,8 @@ public class SingleKeyValueTuple extends BaseTuple {
     }
     
     @Override
-    public KeyValue getValue(byte[] cf, byte[] cq) {
-        return keyValue;
+    public Cell getValue(byte[] cf, byte[] cq) {
+        return cell;
     }
 
     @Override
@@ -83,29 +82,28 @@ public class SingleKeyValueTuple extends BaseTuple {
     
     @Override
     public String toString() {
-        return "SingleKeyValueTuple[" + keyValue == null ? keyPtr.get() == UNITIALIZED_KEY_BUFFER ? "null" : Bytes.toStringBinary(keyPtr.get(),keyPtr.getOffset(),keyPtr.getLength()) : keyValue.toString() + "]";
+        return "SingleKeyValueTuple[" + cell == null ? keyPtr.get() == UNITIALIZED_KEY_BUFFER ? "null" : Bytes.toStringBinary(keyPtr.get(),keyPtr.getOffset(),keyPtr.getLength()) : cell.toString() + "]";
     }
 
     @Override
     public int size() {
-        return keyValue == null ? 0 : 1;
+        return cell == null ? 0 : 1;
     }
 
     @Override
-    public KeyValue getValue(int index) {
-        if (index != 0 || keyValue == null) {
+    public Cell getValue(int index) {
+        if (index != 0 || cell == null) {
             throw new IndexOutOfBoundsException(Integer.toString(index));
         }
-        return keyValue;
+        return cell;
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public boolean getValue(byte[] family, byte[] qualifier,
             ImmutableBytesWritable ptr) {
-        if (keyValue == null)
+        if (cell == null)
             return false;
-        ptr.set(keyValue.getBuffer(), keyValue.getValueOffset(), keyValue.getValueLength());
+        ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0baa1d0f/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
index 898f778..4cb71ff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
@@ -402,7 +402,7 @@ public class SkipScanFilterTest extends TestCase {
             skipper.reset();
             assertFalse(skipper.filterAllRemaining());
             assertFalse(skipper.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()));
-            assertEquals(kv.toString(), ReturnCode.INCLUDE, skipper.filterKeyValue(kv));
+            assertEquals(kv.toString(), ReturnCode.INCLUDE_AND_NEXT_COL, skipper.filterKeyValue(kv));
         }
 
         @Override public String toString() {


[3/7] phoenix git commit: PHOENIX-2551 Remove Jdbc7Shim as it's no longer necessary

Posted by ja...@apache.org.
PHOENIX-2551 Remove Jdbc7Shim as it's no longer necessary


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e44e7a4e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e44e7a4e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e44e7a4e

Branch: refs/heads/4.x-HBase-0.98
Commit: e44e7a4e5061f0deee30ace0ff9bda515798d397
Parents: 0baa1d0
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Dec 30 09:19:41 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800

----------------------------------------------------------------------
 .../phoenix/trace/DelegateConnection.java       | 324 ++++++++++++++++++
 .../phoenix/trace/DelegatingConnection.java     | 328 -------------------
 .../phoenix/trace/PhoenixTracingEndToEndIT.java |   3 +-
 .../java/org/apache/phoenix/jdbc/Jdbc7Shim.java |  64 ----
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   2 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +-
 .../phoenix/jdbc/PhoenixEmbeddedDriver.java     |   3 +-
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |   2 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   2 +-
 9 files changed, 330 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java
new file mode 100644
index 0000000..8fff469
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegateConnection.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * Simple {@link Connection} that just delegates to an underlying {@link Connection}.
+ */
+public class DelegateConnection implements Connection {
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return conn.unwrap(iface);
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return conn.isWrapperFor(iface);
+  }
+
+  @Override
+  public Statement createStatement() throws SQLException {
+    return conn.createStatement();
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql) throws SQLException {
+    return conn.prepareStatement(sql);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql) throws SQLException {
+    return conn.prepareCall(sql);
+  }
+
+  @Override
+  public String nativeSQL(String sql) throws SQLException {
+    return conn.nativeSQL(sql);
+  }
+
+  @Override
+  public void setAutoCommit(boolean autoCommit) throws SQLException {
+    conn.setAutoCommit(autoCommit);
+  }
+
+  @Override
+  public boolean getAutoCommit() throws SQLException {
+    return conn.getAutoCommit();
+  }
+
+  @Override
+  public void commit() throws SQLException {
+    conn.commit();
+  }
+
+  @Override
+  public void rollback() throws SQLException {
+    conn.rollback();
+  }
+
+  @Override
+  public void close() throws SQLException {
+    conn.close();
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return conn.isClosed();
+  }
+
+  @Override
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return conn.getMetaData();
+  }
+
+  @Override
+  public void setReadOnly(boolean readOnly) throws SQLException {
+    conn.setReadOnly(readOnly);
+  }
+
+  @Override
+  public boolean isReadOnly() throws SQLException {
+    return conn.isReadOnly();
+  }
+
+  @Override
+  public void setCatalog(String catalog) throws SQLException {
+    conn.setCatalog(catalog);
+  }
+
+  @Override
+  public String getCatalog() throws SQLException {
+    return conn.getCatalog();
+  }
+
+  @Override
+  public void setTransactionIsolation(int level) throws SQLException {
+    conn.setTransactionIsolation(level);
+  }
+
+  @Override
+  public int getTransactionIsolation() throws SQLException {
+    return conn.getTransactionIsolation();
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return conn.getWarnings();
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    conn.clearWarnings();
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+    return conn.createStatement(resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public PreparedStatement
+      prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+    return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
+      throws SQLException {
+    return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return conn.getTypeMap();
+  }
+
+  @Override
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+    conn.setTypeMap(map);
+  }
+
+  @Override
+  public void setHoldability(int holdability) throws SQLException {
+    conn.setHoldability(holdability);
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    return conn.getHoldability();
+  }
+
+  @Override
+  public Savepoint setSavepoint() throws SQLException {
+    return conn.setSavepoint();
+  }
+
+  @Override
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return conn.setSavepoint(name);
+  }
+
+  @Override
+  public void rollback(Savepoint savepoint) throws SQLException {
+    conn.rollback(savepoint);
+  }
+
+  @Override
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+    conn.releaseSavepoint(savepoint);
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency,
+      int resultSetHoldability) throws SQLException {
+    return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType,
+      int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
+      int resultSetHoldability) throws SQLException {
+    return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+    return conn.prepareStatement(sql, autoGeneratedKeys);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+    return conn.prepareStatement(sql, columnIndexes);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+    return conn.prepareStatement(sql, columnNames);
+  }
+
+  @Override
+  public Clob createClob() throws SQLException {
+    return conn.createClob();
+  }
+
+  @Override
+  public Blob createBlob() throws SQLException {
+    return conn.createBlob();
+  }
+
+  @Override
+  public NClob createNClob() throws SQLException {
+    return conn.createNClob();
+  }
+
+  @Override
+  public SQLXML createSQLXML() throws SQLException {
+    return conn.createSQLXML();
+  }
+
+  @Override
+  public boolean isValid(int timeout) throws SQLException {
+    return conn.isValid(timeout);
+  }
+
+  @Override
+  public void setClientInfo(String name, String value) throws SQLClientInfoException {
+    conn.setClientInfo(name, value);
+  }
+
+  @Override
+  public void setClientInfo(Properties properties) throws SQLClientInfoException {
+    conn.setClientInfo(properties);
+  }
+
+  @Override
+  public String getClientInfo(String name) throws SQLException {
+    return conn.getClientInfo(name);
+  }
+
+  @Override
+  public Properties getClientInfo() throws SQLException {
+    return conn.getClientInfo();
+  }
+
+  @Override
+  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+    return conn.createArrayOf(typeName, elements);
+  }
+
+  @Override
+  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+    return conn.createStruct(typeName, attributes);
+  }
+
+    private Connection conn;
+
+    public DelegateConnection(Connection conn) {
+    this.conn = conn;
+  }
+
+    @Override
+    public void setSchema(String schema) throws SQLException {
+        conn.setSchema(schema);
+    }
+
+    @Override
+    public String getSchema() throws SQLException {
+        return conn.getSchema();
+    }
+
+    @Override
+    public void abort(Executor executor) throws SQLException {
+        conn.abort(executor);
+    }
+
+    @Override
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+        conn.setNetworkTimeout(executor, milliseconds);
+    }
+
+    @Override
+    public int getNetworkTimeout() throws SQLException {
+        return conn.getNetworkTimeout();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
deleted file mode 100644
index 261522d..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.trace;
-
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.CallableStatement;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.NClob;
-import java.sql.PreparedStatement;
-import java.sql.SQLClientInfoException;
-import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Savepoint;
-import java.sql.Statement;
-import java.sql.Struct;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-import org.apache.phoenix.jdbc.Jdbc7Shim;
-
-/**
- * Simple {@link Connection} that just delegates to an underlying {@link Connection}.
- * @param <D> delegate type that is both a {@link Connection} and a {@link Jdbc7Shim#Connection}
- */
-public class DelegatingConnection<D extends Connection & Jdbc7Shim.Connection> implements
-        Connection, Jdbc7Shim.Connection {
-
-  @Override
-  public <T> T unwrap(Class<T> iface) throws SQLException {
-    return conn.unwrap(iface);
-  }
-
-  @Override
-  public boolean isWrapperFor(Class<?> iface) throws SQLException {
-    return conn.isWrapperFor(iface);
-  }
-
-  @Override
-  public Statement createStatement() throws SQLException {
-    return conn.createStatement();
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql) throws SQLException {
-    return conn.prepareStatement(sql);
-  }
-
-  @Override
-  public CallableStatement prepareCall(String sql) throws SQLException {
-    return conn.prepareCall(sql);
-  }
-
-  @Override
-  public String nativeSQL(String sql) throws SQLException {
-    return conn.nativeSQL(sql);
-  }
-
-  @Override
-  public void setAutoCommit(boolean autoCommit) throws SQLException {
-    conn.setAutoCommit(autoCommit);
-  }
-
-  @Override
-  public boolean getAutoCommit() throws SQLException {
-    return conn.getAutoCommit();
-  }
-
-  @Override
-  public void commit() throws SQLException {
-    conn.commit();
-  }
-
-  @Override
-  public void rollback() throws SQLException {
-    conn.rollback();
-  }
-
-  @Override
-  public void close() throws SQLException {
-    conn.close();
-  }
-
-  @Override
-  public boolean isClosed() throws SQLException {
-    return conn.isClosed();
-  }
-
-  @Override
-  public DatabaseMetaData getMetaData() throws SQLException {
-    return conn.getMetaData();
-  }
-
-  @Override
-  public void setReadOnly(boolean readOnly) throws SQLException {
-    conn.setReadOnly(readOnly);
-  }
-
-  @Override
-  public boolean isReadOnly() throws SQLException {
-    return conn.isReadOnly();
-  }
-
-  @Override
-  public void setCatalog(String catalog) throws SQLException {
-    conn.setCatalog(catalog);
-  }
-
-  @Override
-  public String getCatalog() throws SQLException {
-    return conn.getCatalog();
-  }
-
-  @Override
-  public void setTransactionIsolation(int level) throws SQLException {
-    conn.setTransactionIsolation(level);
-  }
-
-  @Override
-  public int getTransactionIsolation() throws SQLException {
-    return conn.getTransactionIsolation();
-  }
-
-  @Override
-  public SQLWarning getWarnings() throws SQLException {
-    return conn.getWarnings();
-  }
-
-  @Override
-  public void clearWarnings() throws SQLException {
-    conn.clearWarnings();
-  }
-
-  @Override
-  public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
-    return conn.createStatement(resultSetType, resultSetConcurrency);
-  }
-
-  @Override
-  public PreparedStatement
-      prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
-    return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
-  }
-
-  @Override
-  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
-      throws SQLException {
-    return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
-  }
-
-  @Override
-  public Map<String, Class<?>> getTypeMap() throws SQLException {
-    return conn.getTypeMap();
-  }
-
-  @Override
-  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
-    conn.setTypeMap(map);
-  }
-
-  @Override
-  public void setHoldability(int holdability) throws SQLException {
-    conn.setHoldability(holdability);
-  }
-
-  @Override
-  public int getHoldability() throws SQLException {
-    return conn.getHoldability();
-  }
-
-  @Override
-  public Savepoint setSavepoint() throws SQLException {
-    return conn.setSavepoint();
-  }
-
-  @Override
-  public Savepoint setSavepoint(String name) throws SQLException {
-    return conn.setSavepoint(name);
-  }
-
-  @Override
-  public void rollback(Savepoint savepoint) throws SQLException {
-    conn.rollback(savepoint);
-  }
-
-  @Override
-  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
-    conn.releaseSavepoint(savepoint);
-  }
-
-  @Override
-  public Statement createStatement(int resultSetType, int resultSetConcurrency,
-      int resultSetHoldability) throws SQLException {
-    return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, int resultSetType,
-      int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-    return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
-  }
-
-  @Override
-  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
-      int resultSetHoldability) throws SQLException {
-    return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
-    return conn.prepareStatement(sql, autoGeneratedKeys);
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
-    return conn.prepareStatement(sql, columnIndexes);
-  }
-
-  @Override
-  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
-    return conn.prepareStatement(sql, columnNames);
-  }
-
-  @Override
-  public Clob createClob() throws SQLException {
-    return conn.createClob();
-  }
-
-  @Override
-  public Blob createBlob() throws SQLException {
-    return conn.createBlob();
-  }
-
-  @Override
-  public NClob createNClob() throws SQLException {
-    return conn.createNClob();
-  }
-
-  @Override
-  public SQLXML createSQLXML() throws SQLException {
-    return conn.createSQLXML();
-  }
-
-  @Override
-  public boolean isValid(int timeout) throws SQLException {
-    return conn.isValid(timeout);
-  }
-
-  @Override
-  public void setClientInfo(String name, String value) throws SQLClientInfoException {
-    conn.setClientInfo(name, value);
-  }
-
-  @Override
-  public void setClientInfo(Properties properties) throws SQLClientInfoException {
-    conn.setClientInfo(properties);
-  }
-
-  @Override
-  public String getClientInfo(String name) throws SQLException {
-    return conn.getClientInfo(name);
-  }
-
-  @Override
-  public Properties getClientInfo() throws SQLException {
-    return conn.getClientInfo();
-  }
-
-  @Override
-  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
-    return conn.createArrayOf(typeName, elements);
-  }
-
-  @Override
-  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
-    return conn.createStruct(typeName, attributes);
-  }
-
-    private D conn;
-
-    public DelegatingConnection(D conn) {
-    this.conn = conn;
-  }
-
-    @Override
-    public void setSchema(String schema) throws SQLException {
-        conn.setSchema(schema);
-    }
-
-    @Override
-    public String getSchema() throws SQLException {
-        return conn.getSchema();
-    }
-
-    @Override
-    public void abort(Executor executor) throws SQLException {
-        conn.abort(executor);
-    }
-
-    @Override
-    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
-        conn.setNetworkTimeout(executor, milliseconds);
-    }
-
-    @Override
-    public int getNetworkTimeout() throws SQLException {
-        return conn.getNetworkTimeout();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
index 05d9e41..686b5f1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
@@ -496,10 +496,9 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
         }
     }
 
-    private static class CountDownConnection extends DelegatingConnection {
+    private static class CountDownConnection extends DelegateConnection {
         private CountDownLatch commit;
 
-        @SuppressWarnings("unchecked")
         public CountDownConnection(Connection conn, CountDownLatch commit) {
             super(conn);
             this.commit = commit;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
deleted file mode 100644
index f51da30..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.jdbc;
-
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.concurrent.Executor;
-import java.util.logging.Logger;
-
-/**
- * Interfaces to be implemented by classes that need to be "JDK7" compliant,
- * but also run in JDK6
- */
-public final class Jdbc7Shim {
-
-    public interface Statement {  // Note: do not extend "regular" statement or else eclipse 3.7 complains
-        void closeOnCompletion() throws SQLException;
-        boolean isCloseOnCompletion() throws SQLException;
-    }
-
-    public interface CallableStatement extends Statement {
-        public <T> T getObject(int columnIndex, Class<T> type) throws SQLException;
-        public <T> T getObject(String columnLabel, Class<T> type) throws SQLException;
-    }
-
-    public interface Connection {
-         void setSchema(String schema) throws SQLException;
-         String getSchema() throws SQLException;
-         void abort(Executor executor) throws SQLException;
-         void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException;
-         int getNetworkTimeout() throws SQLException;
-    }
-
-    public interface ResultSet {
-         public <T> T getObject(int columnIndex, Class<T> type) throws SQLException;
-         public <T> T getObject(String columnLabel, Class<T> type) throws SQLException;
-    }
-
-    public interface DatabaseMetaData {
-        java.sql.ResultSet getPseudoColumns(String catalog, String schemaPattern,
-                             String tableNamePattern, String columnNamePattern)
-            throws SQLException;
-        boolean  generatedKeyAlwaysReturned() throws SQLException;
-    }
-
-    public interface Driver {
-        public Logger getParentLogger() throws SQLFeatureNotSupportedException;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index ed906a4..23d3235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -122,7 +122,7 @@ import co.cask.tephra.TransactionContext;
  * 
  * @since 0.1
  */
-public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jdbc7Shim.Connection, MetaDataMutated{
+public class PhoenixConnection implements Connection, MetaDataMutated {
     private final String url;
     private final ConnectionQueryServices services;
     private final Properties info;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 940ca52..87c02d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -98,7 +98,7 @@ import com.google.common.collect.Lists;
  *
  * @since 0.1
  */
-public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.phoenix.jdbc.Jdbc7Shim.DatabaseMetaData {
+public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final int INDEX_NAME_INDEX = 4; // Shared with FAMILY_NAME_INDEX
     public static final int FAMILY_NAME_INDEX = 4;
     public static final int COLUMN_NAME_INDEX = 3;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index 8ee9dbd..c49bf37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -31,7 +31,6 @@ import java.util.logging.Logger;
 
 import javax.annotation.concurrent.Immutable;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -55,7 +54,7 @@ import com.google.common.collect.Maps;
  * @since 0.1
  */
 @Immutable
-public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoenix.jdbc.Jdbc7Shim.Driver, SQLCloseable {
+public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
     /**
      * The protocol for Phoenix Network Client 
      */ 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index c1bbe81..a3ce1a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -100,7 +100,7 @@ import com.google.common.annotations.VisibleForTesting;
  *
  * @since 0.1
  */
-public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.ResultSet {
+public class PhoenixResultSet implements ResultSet, SQLCloseable {
 
     private static final Log LOG = LogFactory.getLog(PhoenixResultSet.class);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e44e7a4e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 550e915..f36bbd8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -177,7 +177,7 @@ import com.google.common.math.IntMath;
  * 
  * @since 0.1
  */
-public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
+public class PhoenixStatement implements Statement, SQLCloseable {
 	
     private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class);
     


[5/7] phoenix git commit: PHOENIX-2453 Make prepared statement creation delegate to org.apache.phoenix.jdbc.PhoenixConnection#prepareStatement(java.lang.String) (Clement Escoffier)

Posted by ja...@apache.org.
PHOENIX-2453 Make prepared statement creation delegate to org.apache.phoenix.jdbc.PhoenixConnection#prepareStatement(java.lang.String) (Clement Escoffier)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/24d9c3d6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/24d9c3d6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/24d9c3d6

Branch: refs/heads/4.x-HBase-0.98
Commit: 24d9c3d6354a6be695c46747d9349d94c614744c
Parents: 4e96747
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Dec 28 22:42:46 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800

----------------------------------------------------------------------
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  9 ++++--
 .../apache/phoenix/jdbc/PhoenixStatement.java   | 12 ++++----
 .../jdbc/PhoenixPreparedStatementTest.java      | 30 +++++++++++++++++---
 3 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/24d9c3d6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d301321..ed906a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -692,17 +692,20 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
 
     @Override
     public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        // Ignore autoGeneratedKeys, and just execute the statement.
+        return prepareStatement(sql);
     }
 
     @Override
     public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        // Ignore columnIndexes, and just execute the statement.
+        return prepareStatement(sql);
     }
 
     @Override
     public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        // Ignore columnNames, and just execute the statement.
+        return prepareStatement(sql);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/24d9c3d6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 86a139b..550e915 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1300,32 +1300,32 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
 
     @Override
     public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return execute(sql);
     }
 
     @Override
     public boolean execute(String sql, int[] columnIndexes) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return execute(sql);
     }
 
     @Override
     public boolean execute(String sql, String[] columnNames) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return execute(sql);
     }
 
     @Override
     public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return executeUpdate(sql);
     }
 
     @Override
     public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return executeUpdate(sql);
     }
 
     @Override
     public int executeUpdate(String sql, String[] columnNames) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        return executeUpdate(sql);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/24d9c3d6/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
index f9b0274..287fb4b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixPreparedStatementTest.java
@@ -20,14 +20,12 @@ package org.apache.phoenix.jdbc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
+import java.sql.*;
 import java.util.Properties;
 
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.junit.Test;
@@ -180,4 +178,28 @@ public class PhoenixPreparedStatementTest extends BaseConnectionlessQueryTest {
         assertEquals(0, phoenixStmt.getQueryTimeoutInMillis());
     }
 
+    @Test
+    public void testPreparedStatementWithGeneratedKeys() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement statement = conn.prepareStatement("CREATE TABLE T (pk1 CHAR(15) not null, pk2 VARCHAR not null,  v1 VARCHAR(15), v2 DATE, " +
+            "v3 VARCHAR CONSTRAINT pk PRIMARY KEY (pk1, pk2))", PreparedStatement.RETURN_GENERATED_KEYS);
+        statement.execute();
+    }
+
+    @Test
+    public void testPreparedStatementWithColumnIndexes() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement statement = conn.prepareStatement("CREATE TABLE T (pk1 CHAR(15) not null, pk2 VARCHAR not null,  v1 VARCHAR(15), v2 DATE, " +
+            "v3 VARCHAR CONSTRAINT pk PRIMARY KEY (pk1, pk2))", new int[0]);
+        statement.execute();
+    }
+
+    @Test
+    public void testPreparedStatementWithColumnNames() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement statement = conn.prepareStatement("CREATE TABLE T (pk1 CHAR(15) not null, pk2 VARCHAR not null,  v1 VARCHAR(15), v2 DATE, " +
+            "v3 VARCHAR CONSTRAINT pk PRIMARY KEY (pk1, pk2))", new String[0]);
+        statement.execute();
+    }
+
 }


[6/7] phoenix git commit: PHOENIX-2550 Increase JDBC major/minor version for 4.7 release

Posted by ja...@apache.org.
PHOENIX-2550 Increase JDBC major/minor version for 4.7 release


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3172ed45
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3172ed45
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3172ed45

Branch: refs/heads/4.x-HBase-0.98
Commit: 3172ed459d91a5387f29fa99b5d352a38ea0c764
Parents: e44e7a4
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Dec 30 09:32:53 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3172ed45/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index ba06828..bf4a192 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -55,7 +55,7 @@ import com.google.protobuf.ByteString;
  */
 public abstract class MetaDataProtocol extends MetaDataService {
     public static final int PHOENIX_MAJOR_VERSION = 4;
-    public static final int PHOENIX_MINOR_VERSION = 6;
+    public static final int PHOENIX_MINOR_VERSION = 7;
     public static final int PHOENIX_PATCH_NUMBER = 0;
     public static final int PHOENIX_VERSION =
             VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);


[2/7] phoenix git commit: PHOENIX-2411 Allow Phoenix to participate as transactional component

Posted by ja...@apache.org.
PHOENIX-2411 Allow Phoenix to participate as transactional component


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4e967475
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4e967475
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4e967475

Branch: refs/heads/4.x-HBase-0.98
Commit: 4e96747536078324f2c9044fc9815501e692aeed
Parents: 7e4d39e
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Dec 26 23:56:59 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:27 2015 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/tx/TransactionIT.java    | 136 ++++++++++++++++++-
 .../phoenix/exception/SQLExceptionCode.java     |   2 +
 .../apache/phoenix/execute/MutationState.java   |   9 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  56 ++++++--
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  13 +-
 .../org/apache/phoenix/query/QueryServices.java |   9 +-
 .../phoenix/query/QueryServicesOptions.java     |   6 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |  61 ++++++---
 8 files changed, 247 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 0b3ba91..5a322bb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -20,8 +20,6 @@ package org.apache.phoenix.tx;
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
-import static org.apache.phoenix.util.TestUtil.analyzeTable;
-import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -41,22 +39,22 @@ import java.util.Properties;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -66,12 +64,15 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+import co.cask.tephra.hbase98.coprocessor.TransactionProcessor;
+
 public class TransactionIT extends BaseHBaseManagedTimeIT {
 	
 	private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
@@ -603,4 +604,125 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         }
         assertEquals(5, count);
     }
+    
+    @Test
+    public void testExternalTxContext() throws Exception {
+        ResultSet rs;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        
+        TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient();
+
+        String fullTableName = "T";
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
+        HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
+        stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
+        conn.commit();
+
+        try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+        }
+
+        // Use HBase level Tephra APIs to start a new transaction
+        TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW);
+        TransactionContext txContext = new TransactionContext(txServiceClient, txAware);
+        txContext.start();
+        
+        // Use HBase APIs to add a new row
+        Put put = new Put(Bytes.toBytes("z"));
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b"));
+        txAware.put(put);
+        
+        // Use Phoenix APIs to add new row (sharing the transaction context)
+        pconn.setTransactionContext(txContext);
+        conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('y', 'c', 'c')");
+
+        // New connection should not see data as it hasn't been committed yet
+        try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+        }
+        
+        // Use new connection to create a row with a conflict
+        Connection connWithConflict = DriverManager.getConnection(getUrl(), props);
+        connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('z', 'd', 'd')");
+        
+        // Existing connection should see data even though it hasn't been committed yet
+        rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+        
+        // Use Tephra APIs directly to finish (i.e. commit) the transaction
+        txContext.finish();
+        
+        // Confirm that attempt to commit row with conflict fails
+        try {
+            connWithConflict.commit();
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), e.getErrorCode());
+        } finally {
+            connWithConflict.close();
+        }
+        
+        // New connection should now see data as it has been committed
+        try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(3,rs.getInt(1));
+        }
+        
+        // Repeat the same as above, but this time abort the transaction
+        txContext = new TransactionContext(txServiceClient, txAware);
+        txContext.start();
+        
+        // Use HBase APIs to add a new row
+        put = new Put(Bytes.toBytes("j"));
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e"));
+        txAware.put(put);
+        
+        // Use Phoenix APIs to add new row (sharing the transaction context)
+        pconn.setTransactionContext(txContext);
+        conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('k', 'f', 'f')");
+        
+        // Existing connection should see data even though it hasn't been committed yet
+        rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+        assertTrue(rs.next());
+        assertEquals(5,rs.getInt(1));
+
+        connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('k', 'g', 'g')");
+        rs = connWithConflict.createStatement().executeQuery("select count(*) from " + fullTableName);
+        assertTrue(rs.next());
+        assertEquals(4,rs.getInt(1));
+
+        // Use Tephra APIs directly to abort (i.e. rollback) the transaction
+        txContext.abort();
+        
+        rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+
+        // Should succeed since conflicting row was aborted
+        connWithConflict.commit();
+
+        // New connection should now see data as it has been committed
+        try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+            rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(4,rs.getInt(1));
+        }
+        
+        // Even using HBase APIs directly, we shouldn't find 'j' since a delete marker would have been
+        // written to hide it.
+        Result result = htable.get(new Get(Bytes.toBytes("j")));
+        assertTrue(result.isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index ef135eb..36036ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -280,6 +280,8 @@ public enum SQLExceptionCode {
     CANNOT_ALTER_TO_BE_TXN_IF_TXNS_DISABLED(1079, "44A10", "Cannot alter table to be transactional table if transactions are disabled"),
     CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP(1080, "44A11", "Cannot create a transactional table if transactions are disabled"),
     CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP(1081, "44A12", "Cannot alter table to be transactional table if transactions are disabled"),
+    TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set transaction context if transactions are disabled"),
+    TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush if transactions are disabled"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index eb2c77b..0ecce7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -124,6 +124,7 @@ public class MutationState implements SQLCloseable {
     private long sizeOffset;
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
+    private boolean isExternalTxContext = false;
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -170,6 +171,7 @@ public class MutationState implements SQLCloseable {
     					.getQueryServices().getTransactionSystemClient();
     			this.txContext = new TransactionContext(txServiceClient);
 		    } else {
+		        isExternalTxContext = true;
 		        this.txContext = txContext;
 		    }
 		} else {
@@ -224,7 +226,12 @@ public class MutationState implements SQLCloseable {
             boolean hasUncommittedData = false;
             for (TableRef source : sources) {
                 String sourcePhysicalName = source.getTable().getPhysicalName().getString();
-                if (source.getTable().isTransactional() && uncommittedPhysicalNames.contains(sourcePhysicalName)) {
+                // Tracking uncommitted physical table names is an optimization that prevents us from
+                // having to do a checkpoint if no data has yet been written. If we're using an
+                // external transaction context, it's possible that data was already written at the
+                // current transaction timestamp, so we always checkpoint in that case is we're
+                // reading and writing to the same table.
+                if (source.getTable().isTransactional() && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) {
                     hasUncommittedData = true;
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 68312c6..d301321 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -106,6 +106,8 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
 
+import co.cask.tephra.TransactionContext;
+
 
 /**
  * 
@@ -124,11 +126,12 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     private final String url;
     private final ConnectionQueryServices services;
     private final Properties info;
-    private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
-    private final MutationState mutationState;
     private final int mutateBatchSize;
     private final Long scn;
+    private MutationState mutationState;
+    private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
+    private boolean isAutoFlush = false;
     private boolean isAutoCommit = false;
     private PMetaData metaData;
     private final PName tenantId;
@@ -158,6 +161,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade) throws SQLException {
         this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade);
         this.isAutoCommit = connection.isAutoCommit;
+        this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
         this.statementExecutionCounter = connection.statementExecutionCounter;
     }
@@ -177,6 +181,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
         this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade());
         this.isAutoCommit = connection.isAutoCommit;
+        this.isAutoFlush = connection.isAutoFlush;
         this.sampler = connection.sampler;
         this.statementExecutionCounter = connection.statementExecutionCounter;
     }
@@ -216,6 +221,8 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
         checkScn(scnParam);
         this.scn = scnParam;
+        this.isAutoFlush = this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)
+                && this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_FLUSH) ;
         this.isAutoCommit = JDBCUtil.getAutoCommit(
                 url, this.info,
                 this.services.getProps().getBoolean(
@@ -276,17 +283,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
 
     private static Properties filterKnownNonProperties(Properties info) {
         Properties prunedProperties = info;
-        if (info.contains(PhoenixRuntime.CURRENT_SCN_ATTRIB)) {
-            if (prunedProperties == info) {
-                prunedProperties = PropertiesUtil.deepCopy(info);
-            }
-            prunedProperties.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
-        }
-        if (info.contains(PhoenixRuntime.TENANT_ID_ATTRIB)) {
-            if (prunedProperties == info) {
-                prunedProperties = PropertiesUtil.deepCopy(info);
+        for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) {
+            if (info.contains(property)) {
+                if (prunedProperties == info) {
+                    prunedProperties = PropertiesUtil.deepCopy(info);
+                }
+                prunedProperties.remove(property);
             }
-            prunedProperties.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
         }
         return prunedProperties;
     }
@@ -573,6 +576,35 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         return isAutoCommit;
     }
 
+    public boolean getAutoFlush() {
+        return isAutoFlush;
+    }
+
+    public void setAutoFlush(boolean autoFlush) throws SQLException {
+        if (autoFlush && !this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH)
+            .build().buildException();
+        }
+        this.isAutoFlush = autoFlush;
+    }
+
+    public void flush() throws SQLException {
+        mutationState.sendUncommitted();
+    }
+        
+    public void setTransactionContext(TransactionContext txContext) throws SQLException {
+        if (!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT)
+            .build().buildException();
+        }
+        this.mutationState.rollback();
+        this.mutationState = new MutationState(this.mutationState.getMaxSize(), this, txContext);
+        
+        // Write data to HBase after each statement execution as the commit may not
+        // come through Phoenix APIs.
+        setAutoFlush(true);
+    }
+    
     @Override
     public String getCatalog() throws SQLException {
         return "";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 76836d5..86a139b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1165,6 +1165,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 PhoenixPreparedStatement statement = batch.get(i);
                 returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
             }
+            // Flush all changes in batch if auto flush is true
+            flushIfNecessary();
             // If we make it all the way through, clear the batch
             clearBatch();
             return returnCodes;
@@ -1269,9 +1271,17 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
             .build().buildException();
         }
-        return executeMutation(stmt);
+        int updateCount = executeMutation(stmt);
+        flushIfNecessary();
+        return updateCount;
     }
 
+    private void flushIfNecessary() throws SQLException {
+        if (connection.getAutoFlush()) {
+            connection.flush();
+        }
+    }
+    
     @Override
     public boolean execute(String sql) throws SQLException {
         CompilableStatement stmt = parseStatement(sql);
@@ -1281,6 +1291,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 .build().buildException();
             }
             executeMutation(stmt);
+            flushIfNecessary();
             return false;
         }
         executeQuery(stmt);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index c105e9d..7fc6e0f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -84,7 +84,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
     
-    // Deprecated. Use FORCE_ROW_KEY_ORDER instead.
+    @Deprecated // Use FORCE_ROW_KEY_ORDER instead.
     public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB  = "phoenix.query.rowKeyOrderSaltedTable";
     
     public static final String USE_INDEXES_ATTRIB  = "phoenix.query.useIndexes";
@@ -162,9 +162,14 @@ public interface QueryServices extends SQLCloseable {
     public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
     public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
     public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = "phoenix.table.istransactional.default";
-    public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled";
     public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
     
+    // Transaction related configs
+    public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled";
+    // Controls whether or not uncommitted data is automatically sent to HBase
+    // at the end of a statement execution when transaction state is passed through.
+    public static final String AUTO_FLUSH_ATTRIB = "phoenix.transactions.autoFlush";
+
     // rpc queue configs
     public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
     public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index e8f905a..605e987 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -71,8 +71,8 @@ import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 
 import java.util.HashSet;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -197,11 +197,13 @@ public class QueryServicesOptions {
     // TODO Change this to true as part of PHOENIX-1543
     // We'll also need this for transactions to work correctly
     public static final boolean DEFAULT_AUTO_COMMIT = false;
-    public static final boolean DEFAULT_TRANSACTIONAL = false;
     public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false;
     public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false;
     public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
     
+    public static final boolean DEFAULT_TRANSACTIONAL = false;
+    public static final boolean DEFAULT_AUTO_FLUSH = false;
+
     private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
     
     public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4e967475/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 4ed6913..4c65a46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -98,14 +98,6 @@ import com.google.common.collect.Lists;
  */
 public class PhoenixRuntime {
     /**
-     * Use this connection property to control HBase timestamps
-     * by specifying your own long timestamp value at connection time. All
-     * queries will use this as the upper bound of the time range for scans
-     * and DDL, and DML will use this as t he timestamp for key values.
-     */
-    public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
-
-    /**
      * Root for the JDBC URL that the Phoenix accepts accepts.
      */
     public final static String JDBC_PROTOCOL = "jdbc:phoenix";
@@ -121,13 +113,12 @@ public class PhoenixRuntime {
     public final static String EMBEDDED_JDBC_PROTOCOL = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
     /**
-     * Use this connection property to control the number of rows that are
-     * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
-     * It's only used when autoCommit is true and your source table is
-     * different than your target table or your SELECT statement has a
-     * GROUP BY clause.
+     * Use this connection property to control HBase timestamps
+     * by specifying your own long timestamp value at connection time. All
+     * queries will use this as the upper bound of the time range for scans
+     * and DDL, and DML will use this as t he timestamp for key values.
      */
-    public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
+    public static final String CURRENT_SCN_ATTRIB = "CurrentSCN";
 
     /**
      * Use this connection property to help with fairness of resource allocation
@@ -139,11 +130,13 @@ public class PhoenixRuntime {
     public static final String TENANT_ID_ATTRIB = "TenantId";
 
     /**
-     * Use this connection property prefix for annotations that you want to show up in traces and log lines emitted by Phoenix.
-     * This is useful for annotating connections with information available on the client (e.g. user or session identifier) and
-     * having these annotation automatically passed into log lines and traces by Phoenix.
+     * Use this connection property to control the number of rows that are
+     * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
+     * It's only used when autoCommit is true and your source table is
+     * different than your target table or your SELECT statement has a
+     * GROUP BY clause.
      */
-    public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation.";
+    public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
 
     /**
      * Use this connection property to explicitly enable or disable auto-commit on a new connection.
@@ -151,6 +144,32 @@ public class PhoenixRuntime {
     public static final String AUTO_COMMIT_ATTRIB = "AutoCommit";
 
     /**
+<<<<<<< HEAD
+=======
+     * Use this connection property to explicitly set read consistency level on a new connection.
+     */
+    public static final String CONSISTENCY_ATTRIB = "Consistency";
+
+    /**
+     * Use this connection property to explicitly enable or disable request level metric collection.
+     */
+    public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
+
+    /**
+     * All Phoenix specific connection properties
+     * TODO: use enum instead
+     */
+    public final static String[] CONNECTION_PROPERTIES = {
+            CURRENT_SCN_ATTRIB,
+            TENANT_ID_ATTRIB,
+            UPSERT_BATCH_SIZE_ATTRIB,
+            AUTO_COMMIT_ATTRIB,
+            CONSISTENCY_ATTRIB,
+            REQUEST_METRIC_ATTRIB
+            };
+
+    /**
+>>>>>>> 5180ae0... PHOENIX-2411 Allow Phoenix to participate as transactional component
      * Use this as the zookeeper quorum name to have a connection-less connection. This enables
      * Phoenix-compatible HFiles to be created in a map/reduce job by creating tables,
      * upserting data into them, and getting the uncommitted state through {@link #getUncommittedData(Connection)}
@@ -158,9 +177,11 @@ public class PhoenixRuntime {
     public final static String CONNECTIONLESS = "none";
     
     /**
-     * Use this connection property to explicitly enable or disable request level metric collection.
+     * Use this connection property prefix for annotations that you want to show up in traces and log lines emitted by Phoenix.
+     * This is useful for annotating connections with information available on the client (e.g. user or session identifier) and
+     * having these annotation automatically passed into log lines and traces by Phoenix.
      */
-    public static final String REQUEST_METRIC_ATTRIB = "RequestMetric";
+    public static final String ANNOTATION_ATTRIB_PREFIX = "phoenix.annotation.";
 
     private static final String HEADER_IN_LINE = "in-line";
     private static final String SQL_FILE_EXT = ".sql";


[4/7] phoenix git commit: PHOENIX-2549 Modify Phoenix DatabaseMetaData implementation for transaction support

Posted by ja...@apache.org.
PHOENIX-2549 Modify Phoenix DatabaseMetaData implementation for transaction support


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8b3ca756
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8b3ca756
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8b3ca756

Branch: refs/heads/4.x-HBase-0.98
Commit: 8b3ca756ca6a0008f9710ff69d1c1bce430d6080
Parents: 3172ed4
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Dec 30 10:43:30 2015 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Dec 30 17:47:33 2015 -0800

----------------------------------------------------------------------
 .../phoenix/exception/SQLExceptionCode.java     |  1 +
 .../apache/phoenix/jdbc/PhoenixConnection.java  | 12 ++++--
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   | 44 +++++---------------
 .../apache/phoenix/jdbc/PhoenixDriverTest.java  | 27 ++++++++++++
 4 files changed, 48 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 36036ae..228f06f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -282,6 +282,7 @@ public enum SQLExceptionCode {
     CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP(1081, "44A12", "Cannot alter table to be transactional table if transactions are disabled"),
     TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set transaction context if transactions are disabled"),
     TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush if transactions are disabled"),
+    TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL(1084, "44A15", "Cannot set isolation level to TRANSACTION_READ_COMMITTED or TRANSACTION_SERIALIZABLE if transactions are disabled"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 23d3235..04016c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -633,7 +633,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated {
 
     @Override
     public int getTransactionIsolation() throws SQLException {
-        return Connection.TRANSACTION_READ_COMMITTED;
+        boolean transactionsEnabled = getQueryServices().getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
+                QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
+        return  transactionsEnabled ?
+                Connection.TRANSACTION_SERIALIZABLE : Connection.TRANSACTION_READ_COMMITTED;
     }
 
     @Override
@@ -793,8 +796,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated {
 
     @Override
     public void setTransactionIsolation(int level) throws SQLException {
-        if (level != Connection.TRANSACTION_READ_COMMITTED) {
-            throw new SQLFeatureNotSupportedException();
+        boolean transactionsEnabled = getQueryServices().getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
+                QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
+        if (!transactionsEnabled && (level == Connection.TRANSACTION_REPEATABLE_READ || level == Connection.TRANSACTION_SERIALIZABLE)) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL)
+            .build().buildException();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 87c02d0..a4748b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -73,30 +73,8 @@ import com.google.common.collect.Lists;
 
 /**
  *
- * JDBC DatabaseMetaData implementation of Phoenix reflecting read-only nature of driver.
- * Supported metadata methods include:
- * {@link #getTables(String, String, String, String[])}
- * {@link #getColumns(String, String, String, String)}
- * {@link #getTableTypes()}
- * {@link #getPrimaryKeys(String, String, String)}
- * {@link #getIndexInfo(String, String, String, boolean, boolean)}
- * {@link #getSchemas()}
- * {@link #getSchemas(String, String)}
- * {@link #getDatabaseMajorVersion()}
- * {@link #getDatabaseMinorVersion()}
- * {@link #getClientInfoProperties()}
- * {@link #getConnection()}
- * {@link #getDatabaseProductName()}
- * {@link #getDatabaseProductVersion()}
- * {@link #getDefaultTransactionIsolation()}
- * {@link #getDriverName()}
- * {@link #getDriverVersion()}
- * {@link #getSuperTables(String, String, String)}
- * {@link #getCatalogs()}
- * Other ResultSet methods return an empty result set.
- *
- *
- * @since 0.1
+ * JDBC DatabaseMetaData implementation of Phoenix.
+ * 
  */
 public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final int INDEX_NAME_INDEX = 4; // Shared with FAMILY_NAME_INDEX
@@ -1156,32 +1134,32 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
 
     @Override
     public boolean othersDeletesAreVisible(int type) throws SQLException {
-        return true;
+        return false;
     }
 
     @Override
     public boolean othersInsertsAreVisible(int type) throws SQLException {
-        return true;
+        return false;
     }
 
     @Override
     public boolean othersUpdatesAreVisible(int type) throws SQLException {
-        return true;
+        return false;
     }
 
     @Override
     public boolean ownDeletesAreVisible(int type) throws SQLException {
-        return false;
+        return true;
     }
 
     @Override
     public boolean ownInsertsAreVisible(int type) throws SQLException {
-        return false;
+        return true;
     }
 
     @Override
     public boolean ownUpdatesAreVisible(int type) throws SQLException {
-        return false;
+        return true;
     }
 
     @Override
@@ -1387,7 +1365,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
 
     @Override
     public boolean supportsMultipleTransactions() throws SQLException {
-        return false;
+        return true;
     }
 
     @Override
@@ -1534,12 +1512,12 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
 
     @Override
     public boolean supportsTransactionIsolationLevel(int level) throws SQLException {
-        return level == connection.getTransactionIsolation();
+        return true;
     }
 
     @Override
     public boolean supportsTransactions() throws SQLException {
-        return false;
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8b3ca756/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
index 61dc442..b70ea71 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
@@ -87,4 +87,31 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
             assertEquals(SQLExceptionCode.INVALID_SCN.getErrorCode(), e.getErrorCode());
         }
     }
+    
+    @Test
+    public void testDisallowIsolationLevel() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.setTransactionIsolation(Connection.TRANSACTION_NONE);
+        conn.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+        conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+            fail();
+        } catch(SQLException e) {
+            assertEquals(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL.getErrorCode(), e.getErrorCode());
+        }
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+            fail();
+        } catch(SQLException e) {
+            assertEquals(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL.getErrorCode(), e.getErrorCode());
+        }
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+        conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+    }
 }