You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:45 UTC
[07/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation
transaction support through Tephra (James Taylor, Thomas D'Silva)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
new file mode 100644
index 0000000..6bcf7ea
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+
+import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
+
+import com.google.common.collect.Lists;
+
+public class TransactionIT extends BaseHBaseManagedTimeIT {
+
+ private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
+
+ @Before
+ public void setUp() throws SQLException {
+ ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
+ }
+
+ @Test
+ public void testReadOwnWrites() throws Exception {
+ String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ // verify rows can be read even though commit has not been called
+ rs = conn.createStatement().executeQuery(selectSql);
+ TestUtil.validateRowKeyColumns(rs, 1);
+ TestUtil.validateRowKeyColumns(rs, 2);
+ assertFalse(rs.next());
+
+ conn.commit();
+
+ // verify rows can be read after commit
+ rs = conn.createStatement().executeQuery(selectSql);
+ TestUtil.validateRowKeyColumns(rs, 1);
+ TestUtil.validateRowKeyColumns(rs, 2);
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testTxnClosedCorrecty() throws Exception {
+ String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ // verify rows can be read even though commit has not been called
+ rs = conn.createStatement().executeQuery(selectSql);
+ TestUtil.validateRowKeyColumns(rs, 1);
+ TestUtil.validateRowKeyColumns(rs, 2);
+ assertFalse(rs.next());
+
+ conn.close();
+ // wait for any open txns to time out
+ Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000);
+ assertTrue("There should be no invalid transactions", txManager.getInvalidSize()==0);
+ }
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+ try (Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ conn1.setAutoCommit(false);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ conn1.commit();
+
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ // verify rows can be read even though commit has not been called
+ int rowsDeleted = conn1.createStatement().executeUpdate("DELETE FROM " + FULL_TABLE_NAME);
+ assertEquals(2, rowsDeleted);
+
+ // Delete and second upsert not committed yet, so there should be one row.
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+
+ conn1.commit();
+
+ // verify rows are deleted after commit
+ rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testAutoCommitQuerySingleTable() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ // verify no rows returned
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testAutoCommitQueryMultiTables() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ // verify no rows returned
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = b.int_pk)");
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testColConflicts() throws Exception {
+ try (Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ conn1.setAutoCommit(false);
+ conn2.setAutoCommit(false);
+ String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+ conn1.setAutoCommit(false);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+ assertFalse(rs.next());
+ // upsert row using conn1
+ String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.setInt(7, 10);
+ stmt.execute();
+ // upsert row using conn2
+ stmt = conn2.prepareStatement(upsertSql);
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.setInt(7, 11);
+ stmt.execute();
+
+ conn1.commit();
+ //second commit should fail
+ try {
+ conn2.commit();
+ fail();
+ }
+ catch (SQLException e) {
+ assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+ }
+ }
+ }
+
+ private void testRowConflicts() throws Exception {
+ try (Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ conn1.setAutoCommit(false);
+ conn2.setAutoCommit(false);
+ String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+ conn1.setAutoCommit(false);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+ boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, FULL_TABLE_NAME)).isImmutableRows();
+ assertFalse(rs.next());
+ // upsert row using conn1
+ String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.setInt(7, 10);
+ stmt.execute();
+ // upsert row using conn2
+ upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)";
+ stmt = conn2.prepareStatement(upsertSql);
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.setInt(7, 11);
+ stmt.execute();
+
+ conn1.commit();
+ //second commit should fail
+ try {
+ conn2.commit();
+ if (!immutableRows) fail();
+ }
+ catch (SQLException e) {
+ if (immutableRows) fail();
+ assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+ }
+ }
+ }
+
+ @Test
+ public void testRowConflictDetected() throws Exception {
+ testRowConflicts();
+ }
+
+ @Test
+ public void testNoConflictDetectionForImmutableRows() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
+ testRowConflicts();
+ }
+
+ @Test
+ public void testNonTxToTxTable() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)");
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (1)");
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (2, 'a')");
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (3, 'b')");
+ conn.commit();
+
+ conn.createStatement().execute("CREATE INDEX IDX ON NON_TX_TABLE(v)");
+ // Reset empty column value to an empty value like it is pre-transactions
+ HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE"));
+ List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3)));
+ for (Put put : puts) {
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+ }
+ htable.put(puts);
+
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE SET TRANSACTIONAL=true");
+
+ htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE"));
+ assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+ htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("IDX"));
+ assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (4, 'c')");
+ ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE WHERE v IS NULL");
+ assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "NON_TX_TABLE")).isTransactional());
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertFalse(rs.next());
+ conn.commit();
+
+ conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (5, 'd')");
+ rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE");
+ assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "IDX")).isTransactional());
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(5,rs.getInt(1));
+ assertFalse(rs.next());
+ conn.rollback();
+
+ rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testNonTxToTxTableFailure() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG
+ conn.createStatement().execute("CREATE TABLE SYSTEM.NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)");
+ conn.createStatement().execute("UPSERT INTO SYSTEM.NON_TX_TABLE VALUES (1)");
+ conn.commit();
+ // Reset empty column value to an empty value like it is pre-transactions
+ HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE"));
+ Put put = new Put(PInteger.INSTANCE.toBytes(1));
+ put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+ htable.put(put);
+
+ HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+ try {
+ // This will succeed initially in updating the HBase metadata, but then will fail when
+ // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore
+ // the coprocessors back to the non transactional ones.
+ conn.createStatement().execute("ALTER TABLE SYSTEM.NON_TX_TABLE SET TRANSACTIONAL=true");
+ fail();
+ } catch (SQLException e) {
+ assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled"));
+ } finally {
+ admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+ admin.close();
+ }
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM.NON_TX_TABLE WHERE v IS NULL");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertFalse(rs.next());
+
+ htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE"));
+ assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+ assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
+ getTableDescriptor(Bytes.toBytes("SYSTEM.NON_TX_TABLE")).
+ getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
+ }
+
+ @Test
+ public void testProperties() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE NON_TX_TABLE1(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR) TTL=1000");
+ conn.createStatement().execute("CREATE INDEX idx1 ON NON_TX_TABLE1(a.v, b.v) TTL=1000");
+ conn.createStatement().execute("CREATE INDEX idx2 ON NON_TX_TABLE1(c.v) INCLUDE (a.v, b.v) TTL=1000");
+
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE1 SET TRANSACTIONAL=true");
+
+ HTableDescriptor desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE1"));
+ for (HColumnDescriptor colDesc : desc.getFamilies()) {
+ assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+ assertEquals(1000, colDesc.getTimeToLive());
+ assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+ }
+
+ desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX1"));
+ for (HColumnDescriptor colDesc : desc.getFamilies()) {
+ assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+ assertEquals(1000, colDesc.getTimeToLive());
+ assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+ }
+
+ desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX2"));
+ for (HColumnDescriptor colDesc : desc.getFamilies()) {
+ assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+ assertEquals(1000, colDesc.getTimeToLive());
+ assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+ }
+
+ conn.createStatement().execute("CREATE TABLE NON_TX_TABLE2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE2 SET TRANSACTIONAL=true, VERSIONS=10");
+ desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE2"));
+ for (HColumnDescriptor colDesc : desc.getFamilies()) {
+ assertEquals(10, colDesc.getMaxVersions());
+ assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
+ assertEquals(null, colDesc.getValue(TxConstants.PROPERTY_TTL));
+ }
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE2 SET TTL=1000");
+ desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE2"));
+ for (HColumnDescriptor colDesc : desc.getFamilies()) {
+ assertEquals(10, colDesc.getMaxVersions());
+ assertEquals(1000, colDesc.getTimeToLive());
+ assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+ }
+
+ conn.createStatement().execute("CREATE TABLE NON_TX_TABLE3(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE3 SET TRANSACTIONAL=true, b.VERSIONS=10, c.VERSIONS=20");
+ desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE3"));
+ assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, desc.getFamily(Bytes.toBytes("A")).getMaxVersions());
+ assertEquals(10, desc.getFamily(Bytes.toBytes("B")).getMaxVersions());
+ assertEquals(20, desc.getFamily(Bytes.toBytes("C")).getMaxVersions());
+
+ conn.createStatement().execute("CREATE TABLE NON_TX_TABLE4(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
+ try {
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE4 SET TRANSACTIONAL=true, VERSIONS=1");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE.getErrorCode(), e.getErrorCode());
+ }
+
+ try {
+ conn.createStatement().execute("ALTER TABLE NON_TX_TABLE4 SET TRANSACTIONAL=true, b.VERSIONS=1");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE.getErrorCode(), e.getErrorCode());
+ }
+
+ conn.createStatement().execute("CREATE TABLE TX_TABLE1(k INTEGER PRIMARY KEY, v VARCHAR) TTL=1000, TRANSACTIONAL=true");
+ desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("TX_TABLE1"));
+ for (HColumnDescriptor colDesc : desc.getFamilies()) {
+ assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+ assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
+ assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+ }
+ }
+
+ @Test
+ public void testCreateTableToBeTransactional() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE TEST_TRANSACTIONAL_TABLE (k varchar primary key) transactional=true";
+ conn.createStatement().execute(ddl);
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ PTable table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE"));
+ HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
+ assertTrue(table.isTransactional());
+ assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+
+ try {
+ ddl = "ALTER TABLE TEST_TRANSACTIONAL_TABLE SET transactional=false";
+ conn.createStatement().execute(ddl);
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode());
+ }
+
+ HBaseAdmin admin = pconn.getQueryServices().getAdmin();
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING"));
+ desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES));
+ admin.createTable(desc);
+ ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true";
+ conn.createStatement().execute(ddl);
+ assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING")).getValue(TxConstants.READ_NON_TX_DATA));
+
+ // Should be ok, as HBase metadata should match existing metadata.
+ ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)";
+ try {
+ conn.createStatement().execute(ddl);
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode());
+ }
+ ddl += " transactional=true";
+ conn.createStatement().execute(ddl);
+ table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE"));
+ htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
+ assertTrue(table.isTransactional());
+ assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+ }
+
+ public void testCurrentDate() throws Exception {
+ String selectSql = "SELECT current_date() FROM "+FULL_TABLE_NAME;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(false);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ Date date1 = rs.getDate(1);
+ assertFalse(rs.next());
+
+ Thread.sleep(1000);
+
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ Date date2 = rs.getDate(1);
+ assertFalse(rs.next());
+ assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime());
+ }
+ }
+
+ @Test
+ public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ stmt.execute("DROP TABLE DEMO");
+ stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true");
+ stmt.execute("CREATE INDEX DEMO_IDX ON DEMO (v1) INCLUDE(v2)");
+ assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "DEMO")).isTransactional());
+ assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "DEMO_IDX")).isTransactional());
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
new file mode 100644
index 0000000..34869dd
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import co.cask.tephra.Transaction.VisibilityLevel;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
+
+ private final boolean localIndex;
+ private final boolean mutable;
+ private String tableName;
+ private String indexName;
+ private String seqName;
+ private String fullTableName;
+
+ public TxCheckpointIT(boolean localIndex, boolean mutable) {
+ this.localIndex = localIndex;
+ this.mutable = mutable;
+ this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
+ this.indexName = "IDX_" + System.currentTimeMillis();
+ this.seqName = "SEQ_" + System.currentTimeMillis();
+ this.fullTableName = SchemaUtil.getTableName(tableName, tableName);
+ }
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Parameters(name="localIndex = {0} , mutable = {1}")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false, false }, { false, true }, { true, false }, { true, true }
+ });
+ }
+
+ @Test
+ public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ conn.createStatement().execute("CREATE SEQUENCE "+seqName);
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+ conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(val)");
+
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)");
+ for (int i=0; i<6; i++) {
+ Statement stmt = conn.createStatement();
+ int upsertCount = stmt.executeUpdate("UPSERT INTO " + fullTableName + " SELECT NEXT VALUE FOR " + seqName + ", val FROM " + fullTableName);
+ assertEquals((int)Math.pow(2, i), upsertCount);
+ }
+ conn.close();
+ }
+
+ @Test
+ public void testRollbackOfUncommittedDelete() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+ stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
+
+ stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')");
+ stmt.executeUpdate("upsert into " + fullTableName + " values('x2', 'y2', 'a2')");
+
+ //assert values in data table
+ ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
+ assertTrue(rs.next());
+ assertEquals("x1", rs.getString(1));
+ assertEquals("y1", rs.getString(2));
+ assertEquals("a1", rs.getString(3));
+ assertTrue(rs.next());
+ assertEquals("x2", rs.getString(1));
+ assertEquals("y2", rs.getString(2));
+ assertEquals("a2", rs.getString(3));
+ assertFalse(rs.next());
+
+ //assert values in index table
+ rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
+ assertTrue(rs.next());
+ assertEquals("x1", rs.getString(1));
+ assertEquals("y1", rs.getString(2));
+ assertEquals("a1", rs.getString(3));
+ assertTrue(rs.next());
+ assertEquals("x2", rs.getString(1));
+ assertEquals("y2", rs.getString(2));
+ assertEquals("a2", rs.getString(3));
+ assertFalse(rs.next());
+
+ conn.commit();
+
+ stmt.executeUpdate("DELETE FROM " + fullTableName + " WHERE k='x1' AND v1='y1' AND v2='a1'");
+ //assert row is delete in data table
+ rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
+ assertTrue(rs.next());
+ assertEquals("x2", rs.getString(1));
+ assertEquals("y2", rs.getString(2));
+ assertEquals("a2", rs.getString(3));
+ assertFalse(rs.next());
+
+ //assert row is delete in index table
+ rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
+ assertTrue(rs.next());
+ assertEquals("x2", rs.getString(1));
+ assertEquals("y2", rs.getString(2));
+ assertEquals("a2", rs.getString(3));
+ assertFalse(rs.next());
+
+ conn.rollback();
+
+ //assert two rows in data table
+ rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
+ assertTrue(rs.next());
+ assertEquals("x1", rs.getString(1));
+ assertEquals("y1", rs.getString(2));
+ assertEquals("a1", rs.getString(3));
+ assertTrue(rs.next());
+ assertEquals("x2", rs.getString(1));
+ assertEquals("y2", rs.getString(2));
+ assertEquals("a2", rs.getString(3));
+ assertFalse(rs.next());
+
+ //assert two rows in index table
+ rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
+ assertTrue(rs.next());
+ assertEquals("x1", rs.getString(1));
+ assertEquals("y1", rs.getString(2));
+ assertEquals("a1", rs.getString(3));
+ assertTrue(rs.next());
+ assertEquals("x2", rs.getString(1));
+ assertEquals("y2", rs.getString(2));
+ assertEquals("a2", rs.getString(3));
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCheckpointForUpsertSelect() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+ conn.setAutoCommit(false);
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"
+ + (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+ stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+ + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
+
+ stmt.executeUpdate("upsert into " + fullTableName + " values(1, 'a2', 'b1')");
+ stmt.executeUpdate("upsert into " + fullTableName + " values(2, 'a2', 'b2')");
+ stmt.executeUpdate("upsert into " + fullTableName + " values(3, 'a3', 'b3')");
+ conn.commit();
+
+ upsertRows(conn);
+ conn.rollback();
+ verifyRows(conn, 3);
+
+ upsertRows(conn);
+ conn.commit();
+ verifyRows(conn, 6);
+ }
+ }
+
+ private void verifyRows(Connection conn, int expectedMaxId) throws SQLException {
+ ResultSet rs;
+ //query the data table
+ rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ max(id) from " + fullTableName + "");
+ assertTrue(rs.next());
+ assertEquals(expectedMaxId, rs.getLong(1));
+ assertFalse(rs.next());
+
+ // query the index
+ rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ max(id) from " + fullTableName + "");
+ assertTrue(rs.next());
+ assertEquals(expectedMaxId, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+
+ private void upsertRows(Connection conn) throws SQLException {
+ ResultSet rs;
+ MutationState state = conn.unwrap(PhoenixConnection.class)
+ .getMutationState();
+ state.startTransaction();
+ long wp = state.getWritePointer();
+ conn.createStatement().execute(
+ "upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + "");
+ assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+ state.getVisibilityLevel());
+ assertEquals(wp, state.getWritePointer()); // Make sure write ptr
+ // didn't move
+ rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");
+
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(
+ "upsert into " + fullTableName + " select max(id)+1, 'a5', 'b5' from " + fullTableName + "");
+ assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+ state.getVisibilityLevel());
+ assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+ // moves
+ wp = state.getWritePointer();
+ rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");
+
+ assertTrue(rs.next());
+ assertEquals(5, rs.getLong(1));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(
+ "upsert into " + fullTableName + " select max(id)+1, 'a6', 'b6' from " + fullTableName + "");
+ assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+ state.getVisibilityLevel());
+ assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+ // moves
+ wp = state.getWritePointer();
+ rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");
+
+ assertTrue(rs.next());
+ assertEquals(6, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testCheckpointForDeleteAndUpsert() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ ResultSet rs;
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+ conn.setAutoCommit(false);
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)"
+ + (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+ stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)"
+ + (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+ stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+ + "INDEX " + indexName + " ON " + fullTableName + "1 (FK1B)");
+
+ stmt.executeUpdate("upsert into " + fullTableName + "1 values (1, 3, 3)");
+ stmt.executeUpdate("upsert into " + fullTableName + "1 values (2, 2, 2)");
+ stmt.executeUpdate("upsert into " + fullTableName + "1 values (3, 1, 1)");
+ stmt.executeUpdate("upsert into " + fullTableName + "2 values (1, 1)");
+ conn.commit();
+
+ MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+ state.startTransaction();
+ long wp = state.getWritePointer();
+ conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
+ assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
+ assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
+
+ rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getLong(1));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(3,rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(1,rs.getLong(1));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
+ assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+ assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
+
+ rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getLong(1));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getLong(1));
+ assertFalse(rs.next());
+
+ stmt.executeUpdate("upsert into " + fullTableName + "1 SELECT id1 + 3, id1, id1 FROM " + fullTableName + "1");
+ stmt.executeUpdate("upsert into " + fullTableName + "2 values (2, 4)");
+
+ conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
+ assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+ assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
+
+ rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(4,rs.getLong(1));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(4,rs.getLong(1));
+ assertFalse(rs.next());
+
+ conn.rollback();
+
+ rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(1,rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(3,rs.getLong(1));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+ assertTrue(rs.next());
+ assertEquals(3,rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(1,rs.getLong(1));
+ assertFalse(rs.next());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
new file mode 100644
index 0000000..a3b3aa2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TxPointInTimeQueryIT extends BaseClientManagedTimeIT {
+
+ protected long ts;
+
+ @Before
+ public void initTable() throws Exception {
+ ts = nextTimestamp();
+ }
+
+ @Test
+ public void testQueryWithSCN() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+ try {
+ conn.createStatement().execute(
+ "CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true");
+ fail();
+ } catch (SQLException e) {
+ assertEquals("Unexpected Exception",
+ SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET
+ .getErrorCode(), e.getErrorCode());
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index a1563db..359e08f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import co.cask.tephra.Transaction;
+
import org.apache.phoenix.index.IndexMaintainer;
public interface IndexMetaDataCache extends Closeable {
@@ -36,6 +38,13 @@ public interface IndexMetaDataCache extends Closeable {
public List<IndexMaintainer> getIndexMaintainers() {
return Collections.emptyList();
}
+
+ @Override
+ public Transaction getTransaction() {
+ return null;
+ }
+
};
public List<IndexMaintainer> getIndexMaintainers();
+ public Transaction getTransaction();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 3cfdd71..f188ab2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -71,6 +71,7 @@ import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
import com.google.common.collect.ImmutableSet;
+import com.google.protobuf.HBaseZeroCopyByteString;
/**
*
@@ -144,7 +145,7 @@ public class ServerCacheClient {
}
- public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+ public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
ConnectionQueryServices services = connection.getQueryServices();
MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
List<Closeable> closeables = new ArrayList<Closeable>();
@@ -213,6 +214,7 @@ public class ServerCacheClient {
ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
builder.setCacheFactory(svrCacheFactoryBuider.build());
+ builder.setTxState(HBaseZeroCopyByteString.wrap(txState));
instance.addServerCache(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index b968a9b..c7cd58f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -37,6 +37,6 @@ import org.apache.phoenix.memory.MemoryManager;
public interface TenantCache {
MemoryManager getMemoryManager();
Closeable getServerCache(ImmutableBytesPtr cacheId);
- Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+ Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 9005fa8..6ef7a6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -23,14 +23,17 @@ import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.cache.*;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.util.Closeables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
/**
*
* Cache per tenant on server side. Tracks memory usage for each
@@ -80,11 +83,11 @@ public class TenantCacheImpl implements TenantCache {
}
@Override
- public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
- MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+ public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException {
+ MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
boolean success = false;
try {
- Closeable element = cacheFactory.newCache(cachePtr, chunk);
+ Closeable element = cacheFactory.newCache(cachePtr, txState, chunk);
getServerCaches().put(cacheId, element);
success = true;
return element;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
index f09b508..d5b74fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -132,8 +132,8 @@ public class CreateTableCompiler {
// on our connection.
new DelegateConnectionQueryServices(connection.getQueryServices()) {
@Override
- public PMetaData addTable(PTable table) throws SQLException {
- return connection.addTable(table);
+ public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
+ return connection.addTable(table, resolvedTime);
}
},
connection, tableRef.getTimeStamp());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f0f693e..12aa7c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -78,6 +78,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -323,16 +324,20 @@ public class DeleteCompiler {
// - read-only VIEW
// - transactional table with a connection having an SCN
// TODO: SchemaUtil.isReadOnly(PTable, connection)?
- if ( table.getType() == PTableType.VIEW && table.getViewType().isReadOnly() ) {
+ if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
throw new ReadOnlyTableException(schemaName,tableName);
}
+ else if (table.isTransactional() && connection.getSCN() != null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
+ .setTableName(tableName).build().buildException();
+ }
immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty();
noQueryReqd = !hasLimit;
// Can't run on same server for transactional data, as we need the row keys for the data
// that is being upserted for conflict detection purposes.
- runOnServer = isAutoCommit && noQueryReqd;
+ runOnServer = isAutoCommit && noQueryReqd && !table.isTransactional();
HintNode hint = delete.getHint();
if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
@@ -363,8 +368,8 @@ public class DeleteCompiler {
QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory, new SequenceManager(statement));
dataPlanToBe = compiler.compile();
queryPlans = Lists.newArrayList(mayHaveImmutableIndexes
- ? optimizer.getApplicablePlans(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
- : optimizer.getBestPlan(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
+ ? optimizer.getApplicablePlans(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
+ : optimizer.getBestPlan(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
if (mayHaveImmutableIndexes) { // FIXME: this is ugly
// Lookup the table being deleted from in the cache, as it's possible that the
// optimizer updated the cache if it found indexes that were out of date.
@@ -538,11 +543,12 @@ public class DeleteCompiler {
ImmutableBytesWritable ptr = context.getTempPtr();
PTable table = tableRef.getTable();
table.getIndexMaintainers(ptr, context.getConnection());
+ byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
ServerCache cache = null;
try {
if (ptr.getLength() > 0) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
byte[] uuidValue = cache.getId();
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 426556f..0828b94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -80,6 +80,7 @@ import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,7 +227,7 @@ public class FromCompiler {
PTable t = PTableImpl.makePTable(table, projectedColumns);
return new SingleTableColumnResolver(connection, new TableRef(tableRef.getTableAlias(), t, tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols()));
}
-
+
public static ColumnResolver getResolver(TableRef tableRef)
throws SQLException {
SingleTableColumnResolver visitor = new SingleTableColumnResolver(tableRef);
@@ -407,7 +408,7 @@ public class FromCompiler {
PTable theTable = null;
if (updateCacheImmediately || connection.getAutoCommit()) {
MetaDataMutationResult result = client.updateCache(schemaName, tableName);
- timeStamp = result.getMutationTime();
+ timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
theTable = result.getTable();
if (theTable == null) {
throw new TableNotFoundException(schemaName, tableName, timeStamp);
@@ -427,7 +428,7 @@ public class FromCompiler {
if (theTable == null) {
MetaDataMutationResult result = client.updateCache(schemaName, tableName);
if (result.wasUpdated()) {
- timeStamp = result.getMutationTime();
+ timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
theTable = result.getTable();
}
}
@@ -651,7 +652,7 @@ public class FromCompiler {
PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
null, null, columns, null, null, Collections.<PTable>emptyList(),
false, Collections.<PName>emptyList(), null, null, false, false, false, null,
- null, null, false);
+ null, null, false, false);
String alias = subselectNode.getAlias();
TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index e4ca5d9..b55e4aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1302,7 +1302,7 @@ public class JoinCompiler {
left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(),
left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL,
left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(),
- left.rowKeyOrderOptimizable());
+ left.rowKeyOrderOptimizable(), left.isTransactional());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index c6f6bf2..506623b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -48,6 +48,7 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
import com.google.common.collect.Lists;
@@ -179,7 +180,14 @@ public class PostDDLCompiler {
};
PhoenixStatement statement = new PhoenixStatement(connection);
StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
- ScanUtil.setTimeRange(scan, timestamp);
+ long ts = timestamp;
+ // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
+ // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
+ // in this case, so maybe this is ok.
+ if (tableRef.getTable().isTransactional()) {
+ ts = TransactionUtil.convertToNanoseconds(ts);
+ }
+ ScanUtil.setTimeRange(scan, ts);
if (emptyCF != null) {
scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 312de45..80c4b89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -252,7 +252,8 @@ public class StatementContext {
public long getCurrentTime() throws SQLException {
long ts = this.getCurrentTable().getTimeStamp();
- if (ts != QueryConstants.UNSET_TIMESTAMP) {
+ // if the table is transactional then it is only resolved once per query, so we can't use the table timestamp
+ if (!this.getCurrentTable().getTable().isTransactional() && ts != QueryConstants.UNSET_TIMESTAMP) {
return ts;
}
if (currentTime != QueryConstants.UNSET_TIMESTAMP) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index c6aa546..551b05c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -152,7 +152,7 @@ public class TupleProjectionCompiler {
table.getBucketNum(), projectedColumns, table.getParentSchemaName(),
table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
- table.getIndexType(), table.rowKeyOrderOptimizable());
+ table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional());
}
public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException {
@@ -179,7 +179,7 @@ public class TupleProjectionCompiler {
retainPKColumns ? table.getBucketNum() : null, projectedColumns, null,
null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
- null, table.rowKeyOrderOptimizable());
+ null, table.rowKeyOrderOptimizable(), table.isTransactional());
}
// For extracting column references from single select statement
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index afca97a..298303d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -82,7 +82,7 @@ public class UnionCompiler {
PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME,
PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null,
projectedColumns, null, null, null,
- true, null, null, null, true, true, true, null, null, null, false);
+ true, null, null, null, true, true, true, null, null, null, false, false);
TableRef tableRef = new TableRef(null, tempTable, 0, false);
return tableRef;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 953eb2f..0f7f6f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -182,7 +182,7 @@ public class UpsertCompiler {
if (isAutoCommit && rowCount % batchSize == 0) {
MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection);
connection.getMutationState().join(state);
- connection.commit();
+ connection.getMutationState().send();
mutation.clear();
}
}
@@ -292,9 +292,13 @@ public class UpsertCompiler {
// Cannot update:
// - read-only VIEW
// - transactional table with a connection having an SCN
- if ( table.getType() == PTableType.VIEW && table.getViewType().isReadOnly() ) {
+ if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
throw new ReadOnlyTableException(schemaName,tableName);
}
+ else if (table.isTransactional() && connection.getSCN() != null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
+ .setTableName(tableName).build().buildException();
+ }
boolean isSalted = table.getBucketNum() != null;
isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
isSharedViewIndex = table.getViewIndexId() != null;
@@ -463,7 +467,7 @@ public class UpsertCompiler {
// so we might be able to run it entirely on the server side.
// For a table with row timestamp column, we can't guarantee that the row key will reside in the
// region space managed by region servers. So we bail out on executing on server side.
- runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
+ runOnServer = sameTable && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
}
// If we may be able to run on the server, add a hint that favors using the data table
// if all else is equal.
@@ -678,12 +682,13 @@ public class UpsertCompiler {
ImmutableBytesWritable ptr = context.getTempPtr();
PTable table = tableRef.getTable();
table.getIndexMaintainers(ptr, context.getConnection());
+ byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
ServerCache cache = null;
try {
if (ptr.getLength() > 0) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
- cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
byte[] uuidValue = cache.getId();
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 33218ee..d720806 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -57,6 +57,8 @@ import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import co.cask.tephra.Transaction;
+
import com.google.common.collect.ImmutableList;
@@ -86,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
public static final String REVERSE_SCAN = "_ReverseScan";
public static final String ANALYZE_TABLE = "_ANALYZETABLE";
+ public static final String TX_STATE = "_TxState";
public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES";
public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION";
public static final String UPGRADE_DESC_ROW_KEY = "_UPGRADE_DESC_ROW_KEY";
@@ -228,7 +231,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final byte[][] viewConstants, final TupleProjector projector,
final ImmutableBytesWritable ptr) {
return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
- dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr);
+ dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr);
}
/**
@@ -236,13 +239,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
* re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
* for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
* the same from a custom filter.
- * @param arrayFuncRefs
* @param arrayKVRefs
+ * @param arrayFuncRefs
* @param offset starting position in the rowkey.
* @param scan
* @param tupleProjector
* @param dataRegion
* @param indexMaintainer
+ * @param tx current transaction
* @param viewConstants
*/
protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -250,6 +254,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final Expression[] arrayFuncRefs, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final Region dataRegion, final IndexMaintainer indexMaintainer,
+ Transaction tx,
final byte[][] viewConstants, final KeyValueSchema kvSchema,
final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
final ImmutableBytesWritable ptr) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7eb1dc6..0a313be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -60,6 +60,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
@@ -240,6 +241,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final KeyValue EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
+ private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
EMPTY_KEYVALUE_KV,
@@ -261,7 +263,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
INDEX_DISABLE_TIMESTAMP_KV,
STORE_NULLS_KV,
BASE_COLUMN_COUNT_KV,
- ROW_KEY_ORDER_OPTIMIZABLE_KV
+ ROW_KEY_ORDER_OPTIMIZABLE_KV,
+ TRANSACTIONAL_KV
);
static {
Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -285,6 +288,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final int STORE_NULLS_INDEX = TABLE_KV_COLUMNS.indexOf(STORE_NULLS_KV);
private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
+ private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -789,6 +793,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
+ Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
+ boolean transactional = transactionalKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(transactionalKv.getValueArray(), transactionalKv.getValueOffset(), transactionalKv.getValueLength()));
Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
@@ -844,7 +850,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp,
tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
- disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable);
+ disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable, transactional);
}
private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
@@ -2332,6 +2338,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
continue;
}
+ } else if (pkCount == COLUMN_NAME_INDEX &&
+ ! (Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0 &&
+ Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) ) {
+ // Invalidate any table with mutations
+ // TODO: this likely means we don't need the above logic that
+ // loops through the indexes if adding a PK column, since we'd
+ // always have header rows for those.
+ invalidateList.add(new ImmutableBytesPtr(SchemaUtil
+ .getTableKey(tenantId,
+ rowKeyMetaData[SCHEMA_NAME_INDEX],
+ rowKeyMetaData[TABLE_NAME_INDEX])));
}
}
tableMetaData.addAll(mutationsForAddingColumnsToViews);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 7cc4123..ba06828 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -23,9 +23,9 @@ import java.util.List;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
-import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.schema.PColumn;
@@ -73,7 +73,9 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0 = MIN_TABLE_TIMESTAMP + 7;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0 = MIN_TABLE_TIMESTAMP + 8;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0 = MIN_TABLE_TIMESTAMP + 9;
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0;
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 = MIN_TABLE_TIMESTAMP + 10;
+ // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0;
// TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
// a different code for every type of error.
// ENTITY_ALREADY_EXISTS, ENTITY_NOT_FOUND, NEWER_ENTITY_FOUND, ENTITY_NOT_IN_REGION, CONCURRENT_MODIFICATION