You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/10 08:41:52 UTC

phoenix git commit: Move txn manager setup to BaseTest, mutable si fixes, don't set scan timerange for txnal tables

Repository: phoenix
Updated Branches:
  refs/heads/txn f844829c8 -> fb489debb


Move txn manager setup to BaseTest, mutable si fixes, don't set scan timerange for txnal tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fb489deb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fb489deb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fb489deb

Branch: refs/heads/txn
Commit: fb489debbb0e98a398f5e819fb1548e56e0b7366
Parents: f844829
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Apr 9 23:41:46 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Apr 9 23:41:46 2015 -0700

----------------------------------------------------------------------
 .../end2end/index/TxGlobalMutableIndexIT.java   |  1 +
 .../phoenix/transactions/TransactionIT.java     | 81 +++-----------------
 .../apache/phoenix/compile/PostDDLCompiler.java | 10 ++-
 .../phoenix/exception/SQLExceptionCode.java     |  1 +
 .../apache/phoenix/execute/BaseQueryPlan.java   | 22 +++---
 .../apache/phoenix/execute/MutationState.java   |  3 +-
 .../index/PhoenixTransactionalIndexer.java      | 48 ++++++------
 .../phoenix/query/QueryServicesOptions.java     |  2 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 12 +++
 .../apache/phoenix/util/TransactionUtil.java    |  4 +
 .../java/org/apache/phoenix/query/BaseTest.java | 76 +++++++++++++++++-
 11 files changed, 150 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
index 2d22b0f..a2e0412 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
@@ -27,6 +27,7 @@ import org.junit.BeforeClass;
 import com.google.common.collect.Maps;
 
 public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT {
+    
     @BeforeClass
     @Shadower(classBeingShadowed = BaseMutableIndexIT.class)
     public static void doSetup() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
index 9b50551..c3b9c2e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
@@ -23,77 +23,18 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.DateUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
 import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import com.google.common.collect.Maps;
 
 public class TransactionIT extends BaseHBaseManagedTimeIT {
 	
 	private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
-	@ClassRule
-	public static TemporaryFolder tmpFolder = new TemporaryFolder();
-	
-	@BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-	public static void doSetup() throws Exception {
-	    Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
-        // drop HBase tables because TransactionProcessor will not allow you to delete rows
-        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-        
-		config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-		config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
-		config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-		config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-		
-		ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
-	    ZKClientService zkClient = ZKClientServices.delegate(
-	      ZKClients.reWatchOnExpire(
-	        ZKClients.retryOnFailure(
-	          ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
-	            .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
-	            		HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
-	            .build(),
-	          RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
-	        )
-	      )
-	    );
-	    zkClient.startAndWait();
-
-	    DiscoveryService discovery = new ZKDiscoveryService(zkClient);
-	    final TransactionManager txManager = new TransactionManager(config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
-	    TransactionService txService = new TransactionService(config, zkClient, discovery, txManager);
-	    txService.startAndWait();
-	}
 	
 	@Before
     public void setUp() throws SQLException {
@@ -111,17 +52,17 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         stmt.setDate(6, date);
     }
 	
-	private void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
-		assertTrue(rs.next());
-		assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
-		assertEquals(rs.getString(2), "char" + String.valueOf(i));
-		assertEquals(rs.getInt(3), i);
-		assertEquals(rs.getInt(4), i);
-		assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
-		Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
-		assertEquals(rs.getDate(6), date);
-	}
-	
+//	private void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
+//		assertTrue(rs.next());
+//		assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
+//		assertEquals(rs.getString(2), "char" + String.valueOf(i));
+//		assertEquals(rs.getInt(3), i);
+//		assertEquals(rs.getInt(4), i);
+//		assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
+//		Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
+//		assertEquals(rs.getDate(6), date);
+//	}
+//	
 //	@Test
 //	public void testUpsert() throws Exception {
 //		Connection conn = DriverManager.getConnection(getUrl());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 6b61039..72a59c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 
@@ -161,7 +162,14 @@ public class PostDDLCompiler {
                         };
                         PhoenixStatement statement = new PhoenixStatement(connection);
                         StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
-                        ScanUtil.setTimeRange(scan, timestamp);
+                        long ts = timestamp;
+                        // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
+                        // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
+                        // in this case, so maybe this is ok.
+                        if (tableRef.getTable().isTransactional()) {
+                            ts = TransactionUtil.translateMillis(ts);
+                        }
+                        ScanUtil.setTimeRange(scan, ts);
                         if (emptyCF != null) {
                             scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2742079..517b3ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -250,6 +250,7 @@ public enum SQLExceptionCode {
     DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column family not allowed on VIEW or shared INDEX"),
     ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"),
     MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing HBase table may not be mapped to as a transactional table"),
+    STORE_NULLS_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index d96e339..387f23d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -178,21 +178,25 @@ public abstract class BaseQueryPlan implements QueryPlan {
         // The time stamp comes from the server at compile time when the meta data
         // is resolved.
         // TODO: include time range in explain plan?
+        PTable table = context.getCurrentTable().getTable();
         PhoenixConnection connection = context.getConnection();
-        if (context.getScanTimeRange() == null) {
-          Long scn = connection.getSCN();
-          if (scn == null) {
-            scn = context.getCurrentTime();
-          }
-          ScanUtil.setTimeRange(scan, scn);
-        } else {
-            ScanUtil.setTimeRange(scan, context.getScanTimeRange());
+        // Timestamp is managed by Transaction Manager for transactional tables
+        if (!table.isTransactional()) {
+            if (context.getScanTimeRange() == null) {
+              Long scn = connection.getSCN();
+              if (scn == null) {
+                scn = context.getCurrentTime();
+              }
+              ScanUtil.setTimeRange(scan, scn);
+            } else {
+                ScanUtil.setTimeRange(scan, context.getScanTimeRange());
+            }
         }
         ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes());
         String customAnnotations = LogUtil.customAnnotationsToString(connection);
         ScanUtil.setCustomAnnotations(scan, customAnnotations == null ? null : customAnnotations.getBytes());
         // Set local index related scan attributes. 
-        if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) {
+        if (table.getIndexType() == IndexType.LOCAL) {
             ScanUtil.setLocalIndex(scan);
             Set<PColumn> dataColumns = context.getDataColumns();
             // If any data columns to join back from data table are present then we set following attributes

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index b562cd8..59bc6ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import co.cask.tephra.TxConstants;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -417,6 +417,7 @@ public class MutationState implements SQLCloseable {
                             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                             if (attribValue != null) {
                                 mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index dd71a58..f6e6806 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -49,7 +49,6 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
@@ -57,6 +56,7 @@ import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceScope;
@@ -127,8 +127,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 this.writer.write(indexUpdates);
             }
         } catch (Throwable t) {
-            LOG.error("Failed to update index with entries:" + indexUpdates, t);
-            IndexManagementUtil.rethrowIndexingException(t);
+            String msg = "Failed to update index with entries:" + indexUpdates;
+            LOG.error(msg, t);
+            ServerUtil.throwIOException(msg, t);
         }
     }
 
@@ -166,6 +167,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             stored.addAll(m);
         }
         
+        Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
         try {
             if (!mutableColumns.isEmpty()) {
                 List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
@@ -182,36 +184,34 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 txTable.startTx(tx);
                 scanner = txTable.getScanner(scan);
             }
-        } finally {
-            if (txTable != null) txTable.close();
-        }
-        
-        Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
-        if (scanner == null) {
-            for (Mutation m : mutations.values()) {
-                TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
-                state.applyMutation(m);
-                Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
-                for (IndexUpdate update : updates) {
-                    indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+            if (scanner != null) {
+                Result result;
+                while ((result = scanner.next()) != null) {
+                    TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
+                    Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
+                    for (IndexUpdate delete : deletes) {
+                        indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+                    }
+                    Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
+                    state.applyMutation(m);
+                    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
+                    for (IndexUpdate update : updates) {
+                        indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+                    }
                 }
             }
-        } else {
-            Result result;
-            while ((result = scanner.next()) != null) {
-                TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
-                Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
-                for (IndexUpdate delete : deletes) {
-                    indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
-                }
-                Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
+            for (Mutation m : mutations.values()) {
+                TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
                 state.applyMutation(m);
                 Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
                 for (IndexUpdate update : updates) {
                     indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
                 }
             }
+        } finally {
+            if (txTable != null) txTable.close();
         }
+        
         return indexUpdates;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 1e0de9d..83944b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -108,7 +108,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
     public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY;
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb
-    public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
+    public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
     // Only the first chunked batches are fetched in parallel, so this default
     // should be on the relatively bigger side of things. Bigger means more

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 54a5c7e..7c78089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1336,6 +1336,7 @@ public class MetaDataClient {
                     storeNulls = connection.getQueryServices().getProps().getBoolean(
                                     QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
                                     QueryServicesOptions.DEFAULT_STORE_NULLS);
+                    tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.valueOf(storeNulls));
                 }
             } else {
                 storeNulls = storeNullsProp;
@@ -1351,10 +1352,21 @@ public class MetaDataClient {
                     transactional = connection.getQueryServices().getProps().getBoolean(
                                     QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
                                     QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+                    tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, Boolean.valueOf(transactional));
                 } else {
                     transactional = transactionalProp;
                 }
             }
+            if (transactional) { // FIXME: remove once Tephra handles column deletes
+                if (Boolean.FALSE.equals(storeNullsProp)) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_FOR_TRANSACTIONAL)
+                    .setSchemaName(schemaName).setTableName(tableName)
+                    .build().buildException();
+                }
+                // Force STORE_NULLS to true when transactional as Tephra cannot deal with column deletes
+                storeNulls = true;
+                tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE);
+            }
 
             // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
             if (statement.getTableType() == PTableType.VIEW || indexId != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 6058711..33079d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -37,6 +37,10 @@ public class TransactionUtil {
     
     private static final TransactionCodec codec = new TransactionCodec();
     
+    public static long translateMillis(long serverTimeStamp) {
+        return serverTimeStamp * 1000000;
+    }
+    
     public static byte[] encodeTxnState(Transaction txn) throws SQLException {
         try {
             return codec.encode(txn);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 699d350..cdb741d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -87,6 +87,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -106,9 +107,16 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.TransactionService;
+import co.cask.tephra.metrics.TxMetricsCollector;
+import co.cask.tephra.persist.InMemoryTransactionStateStorage;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -125,6 +133,7 @@ import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PTableType;
@@ -137,6 +146,14 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,7 +198,11 @@ public abstract class BaseTest {
 	                "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) ";
 	private static final Map<String,String> tableDDLMap;
     private static final Logger logger = LoggerFactory.getLogger(BaseTest.class);
-
+    private static ZKClientService zkClient;
+    private static TransactionService txService;
+    @ClassRule
+    public static TemporaryFolder tmpFolder = new TemporaryFolder();
+    
     static {
         ImmutableMap.Builder<String,String> builder = ImmutableMap.builder();
         builder.put(ENTITY_HISTORY_TABLE_NAME,"create table " + ENTITY_HISTORY_TABLE_NAME +
@@ -458,10 +479,53 @@ public abstract class BaseTest {
         return url;
     }
     
-    protected static String checkClusterInitialized(ReadOnlyProps overrideProps) {
+    private static void teardownTxManager() throws SQLException {
+        try {
+            if (txService != null) txService.stopAndWait();
+        } finally {
+            try {
+                if (zkClient != null) zkClient.stopAndWait();
+            } finally {
+                txService = null;
+                zkClient = null;
+            }
+        }
+        
+    }
+    
+    private static void setupTxManager() throws SQLException, IOException {
+        config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+        config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+        config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+//        config.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, ConnectionInfo.getZookeeperConnectionString(getUrl()));
+//        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tmp");
+
+        ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
+        zkClient = ZKClientServices.delegate(
+          ZKClients.reWatchOnExpire(
+            ZKClients.retryOnFailure(
+              ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                .build(),
+              RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+            )
+          )
+        );
+        zkClient.startAndWait();
+
+        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+        TransactionManager txManager = new TransactionManager(config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+        txService = new TransactionService(config, zkClient, discovery, txManager);
+        txService.startAndWait();
+    }
+
+    protected static String checkClusterInitialized(ReadOnlyProps overrideProps) throws Exception {
         if (!clusterInitialized) {
             url = setUpTestCluster(config, overrideProps);
             clusterInitialized = true;
+            setupTxManager();
         }
         return url;
     }
@@ -507,8 +571,12 @@ public abstract class BaseTest {
                     utility.shutdownMiniCluster();
                 }
             } finally {
-                utility = null;
-                clusterInitialized = false;
+                try {
+                    teardownTxManager();
+                } finally {
+                    utility = null;
+                    clusterInitialized = false;
+                }
             }
         }
     }