You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/10 08:41:52 UTC
phoenix git commit: Move txn manager setup to BaseTest,
mutable si fixes, don't set scan timerange for txnal tables
Repository: phoenix
Updated Branches:
refs/heads/txn f844829c8 -> fb489debb
Move txn manager setup to BaseTest, mutable si fixes, don't set scan timerange for txnal tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fb489deb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fb489deb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fb489deb
Branch: refs/heads/txn
Commit: fb489debbb0e98a398f5e819fb1548e56e0b7366
Parents: f844829
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Apr 9 23:41:46 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Apr 9 23:41:46 2015 -0700
----------------------------------------------------------------------
.../end2end/index/TxGlobalMutableIndexIT.java | 1 +
.../phoenix/transactions/TransactionIT.java | 81 +++-----------------
.../apache/phoenix/compile/PostDDLCompiler.java | 10 ++-
.../phoenix/exception/SQLExceptionCode.java | 1 +
.../apache/phoenix/execute/BaseQueryPlan.java | 22 +++---
.../apache/phoenix/execute/MutationState.java | 3 +-
.../index/PhoenixTransactionalIndexer.java | 48 ++++++------
.../phoenix/query/QueryServicesOptions.java | 2 +-
.../apache/phoenix/schema/MetaDataClient.java | 12 +++
.../apache/phoenix/util/TransactionUtil.java | 4 +
.../java/org/apache/phoenix/query/BaseTest.java | 76 +++++++++++++++++-
11 files changed, 150 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
index 2d22b0f..a2e0412 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
@@ -27,6 +27,7 @@ import org.junit.BeforeClass;
import com.google.common.collect.Maps;
public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT {
+
@BeforeClass
@Shadower(classBeingShadowed = BaseMutableIndexIT.class)
public static void doSetup() throws Exception {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
index 9b50551..c3b9c2e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java
@@ -23,77 +23,18 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-
-import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.end2end.Shadower;
import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.DateUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import com.google.common.collect.Maps;
public class TransactionIT extends BaseHBaseManagedTimeIT {
private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @BeforeClass
- @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
- public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
- // drop HBase tables because TransactionProcessor will not allow you to delete rows
- props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-
- config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
- config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
- config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
- config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
-
- ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
- ZKClientService zkClient = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
- .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
- HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
- .build(),
- RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
- )
- )
- );
- zkClient.startAndWait();
-
- DiscoveryService discovery = new ZKDiscoveryService(zkClient);
- final TransactionManager txManager = new TransactionManager(config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
- TransactionService txService = new TransactionService(config, zkClient, discovery, txManager);
- txService.startAndWait();
- }
@Before
public void setUp() throws SQLException {
@@ -111,17 +52,17 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
stmt.setDate(6, date);
}
- private void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
- assertTrue(rs.next());
- assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
- assertEquals(rs.getString(2), "char" + String.valueOf(i));
- assertEquals(rs.getInt(3), i);
- assertEquals(rs.getInt(4), i);
- assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
- Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
- assertEquals(rs.getDate(6), date);
- }
-
+// private void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
+// assertTrue(rs.next());
+// assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
+// assertEquals(rs.getString(2), "char" + String.valueOf(i));
+// assertEquals(rs.getInt(3), i);
+// assertEquals(rs.getInt(4), i);
+// assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
+// Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
+// assertEquals(rs.getDate(6), date);
+// }
+//
// @Test
// public void testUpsert() throws Exception {
// Connection conn = DriverManager.getConnection(getUrl());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 6b61039..72a59c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
import com.google.common.collect.Lists;
@@ -161,7 +162,14 @@ public class PostDDLCompiler {
};
PhoenixStatement statement = new PhoenixStatement(connection);
StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
- ScanUtil.setTimeRange(scan, timestamp);
+ long ts = timestamp;
+ // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
+ // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
+ // in this case, so maybe this is ok.
+ if (tableRef.getTable().isTransactional()) {
+ ts = TransactionUtil.translateMillis(ts);
+ }
+ ScanUtil.setTimeRange(scan, ts);
if (emptyCF != null) {
scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2742079..517b3ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -250,6 +250,7 @@ public enum SQLExceptionCode {
DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column family not allowed on VIEW or shared INDEX"),
ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"),
MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing HBase table may not be mapped to as a transactional table"),
+ STORE_NULLS_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"),
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index d96e339..387f23d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -178,21 +178,25 @@ public abstract class BaseQueryPlan implements QueryPlan {
// The time stamp comes from the server at compile time when the meta data
// is resolved.
// TODO: include time range in explain plan?
+ PTable table = context.getCurrentTable().getTable();
PhoenixConnection connection = context.getConnection();
- if (context.getScanTimeRange() == null) {
- Long scn = connection.getSCN();
- if (scn == null) {
- scn = context.getCurrentTime();
- }
- ScanUtil.setTimeRange(scan, scn);
- } else {
- ScanUtil.setTimeRange(scan, context.getScanTimeRange());
+ // Timestamp is managed by Transaction Manager for transactional tables
+ if (!table.isTransactional()) {
+ if (context.getScanTimeRange() == null) {
+ Long scn = connection.getSCN();
+ if (scn == null) {
+ scn = context.getCurrentTime();
+ }
+ ScanUtil.setTimeRange(scan, scn);
+ } else {
+ ScanUtil.setTimeRange(scan, context.getScanTimeRange());
+ }
}
ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes());
String customAnnotations = LogUtil.customAnnotationsToString(connection);
ScanUtil.setCustomAnnotations(scan, customAnnotations == null ? null : customAnnotations.getBytes());
// Set local index related scan attributes.
- if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) {
+ if (table.getIndexType() == IndexType.LOCAL) {
ScanUtil.setLocalIndex(scan);
Set<PColumn> dataColumns = context.getDataColumns();
// If any data columns to join back from data table are present then we set following attributes
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index b562cd8..59bc6ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import co.cask.tephra.TxConstants;
import co.cask.tephra.hbase98.TransactionAwareHTable;
import org.apache.hadoop.hbase.HConstants;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -417,6 +417,7 @@ public class MutationState implements SQLCloseable {
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
if (attribValue != null) {
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index dd71a58..f6e6806 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -49,7 +49,6 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.types.PVarbinary;
@@ -57,6 +56,7 @@ import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
@@ -127,8 +127,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
this.writer.write(indexUpdates);
}
} catch (Throwable t) {
- LOG.error("Failed to update index with entries:" + indexUpdates, t);
- IndexManagementUtil.rethrowIndexingException(t);
+ String msg = "Failed to update index with entries:" + indexUpdates;
+ LOG.error(msg, t);
+ ServerUtil.throwIOException(msg, t);
}
}
@@ -166,6 +167,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
stored.addAll(m);
}
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
try {
if (!mutableColumns.isEmpty()) {
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
@@ -182,36 +184,34 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
txTable.startTx(tx);
scanner = txTable.getScanner(scan);
}
- } finally {
- if (txTable != null) txTable.close();
- }
-
- Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
- if (scanner == null) {
- for (Mutation m : mutations.values()) {
- TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
- state.applyMutation(m);
- Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
- for (IndexUpdate update : updates) {
- indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+ if (scanner != null) {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
+ Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
+ for (IndexUpdate delete : deletes) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+ }
+ Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
+ state.applyMutation(m);
+ Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
+ for (IndexUpdate update : updates) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+ }
}
}
- } else {
- Result result;
- while ((result = scanner.next()) != null) {
- TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result);
- Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
- for (IndexUpdate delete : deletes) {
- indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
- }
- Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow()));
+ for (Mutation m : mutations.values()) {
+ TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m);
state.applyMutation(m);
Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData);
for (IndexUpdate update : updates) {
indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
}
}
+ } finally {
+ if (txTable != null) txTable.close();
}
+
return indexUpdates;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 1e0de9d..83944b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -108,7 +108,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = DEFAULT_MAX_QUERY_CONCURRENCY;
public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb
- public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
+ public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
// Only the first chunked batches are fetched in parallel, so this default
// should be on the relatively bigger side of things. Bigger means more
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 54a5c7e..7c78089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1336,6 +1336,7 @@ public class MetaDataClient {
storeNulls = connection.getQueryServices().getProps().getBoolean(
QueryServices.DEFAULT_STORE_NULLS_ATTRIB,
QueryServicesOptions.DEFAULT_STORE_NULLS);
+ tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.valueOf(storeNulls));
}
} else {
storeNulls = storeNullsProp;
@@ -1351,10 +1352,21 @@ public class MetaDataClient {
transactional = connection.getQueryServices().getProps().getBoolean(
QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+ tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, Boolean.valueOf(transactional));
} else {
transactional = transactionalProp;
}
}
+ if (transactional) { // FIXME: remove once Tephra handles column deletes
+ if (Boolean.FALSE.equals(storeNullsProp)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.STORE_NULLS_FOR_TRANSACTIONAL)
+ .setSchemaName(schemaName).setTableName(tableName)
+ .build().buildException();
+ }
+ // Force STORE_NULLS to true when transactional as Tephra cannot deal with column deletes
+ storeNulls = true;
+ tableProps.put(PhoenixDatabaseMetaData.STORE_NULLS, Boolean.TRUE);
+ }
// Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
if (statement.getTableType() == PTableType.VIEW || indexId != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 6058711..33079d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -37,6 +37,10 @@ public class TransactionUtil {
private static final TransactionCodec codec = new TransactionCodec();
+ public static long translateMillis(long serverTimeStamp) {
+ return serverTimeStamp * 1000000;
+ }
+
public static byte[] encodeTxnState(Transaction txn) throws SQLException {
try {
return codec.encode(txn);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fb489deb/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 699d350..cdb741d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -87,6 +87,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -106,9 +107,16 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.TransactionService;
+import co.cask.tephra.metrics.TxMetricsCollector;
+import co.cask.tephra.persist.InMemoryTransactionStateStorage;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -125,6 +133,7 @@ import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PTableType;
@@ -137,6 +146,14 @@ import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,7 +198,11 @@ public abstract class BaseTest {
" CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) ";
private static final Map<String,String> tableDDLMap;
private static final Logger logger = LoggerFactory.getLogger(BaseTest.class);
-
+ private static ZKClientService zkClient;
+ private static TransactionService txService;
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
static {
ImmutableMap.Builder<String,String> builder = ImmutableMap.builder();
builder.put(ENTITY_HISTORY_TABLE_NAME,"create table " + ENTITY_HISTORY_TABLE_NAME +
@@ -458,10 +479,53 @@ public abstract class BaseTest {
return url;
}
- protected static String checkClusterInitialized(ReadOnlyProps overrideProps) {
+ private static void teardownTxManager() throws SQLException {
+ try {
+ if (txService != null) txService.stopAndWait();
+ } finally {
+ try {
+ if (zkClient != null) zkClient.stopAndWait();
+ } finally {
+ txService = null;
+ zkClient = null;
+ }
+ }
+
+ }
+
+ private static void setupTxManager() throws SQLException, IOException {
+ config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+ config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+ config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+ config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+// config.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, ConnectionInfo.getZookeeperConnectionString(getUrl()));
+// config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tmp");
+
+ ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
+ zkClient = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+ .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+ HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+ .build(),
+ RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+ )
+ )
+ );
+ zkClient.startAndWait();
+
+ DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+ TransactionManager txManager = new TransactionManager(config, new InMemoryTransactionStateStorage(), new TxMetricsCollector());
+ txService = new TransactionService(config, zkClient, discovery, txManager);
+ txService.startAndWait();
+ }
+
+ protected static String checkClusterInitialized(ReadOnlyProps overrideProps) throws Exception {
if (!clusterInitialized) {
url = setUpTestCluster(config, overrideProps);
clusterInitialized = true;
+ setupTxManager();
}
return url;
}
@@ -507,8 +571,12 @@ public abstract class BaseTest {
utility.shutdownMiniCluster();
}
} finally {
- utility = null;
- clusterInitialized = false;
+ try {
+ teardownTxManager();
+ } finally {
+ utility = null;
+ clusterInitialized = false;
+ }
}
}
}