You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/14 07:37:29 UTC
[13/21] phoenix git commit: PHOENIX-4605 Support running multiple
transaction providers
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index a7b31e8..1a11427 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -69,6 +69,7 @@ import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -134,7 +135,7 @@ public class PTableImpl implements PTable {
private boolean disableWAL;
private boolean multiTenant;
private boolean storeNulls;
- private boolean isTransactional;
+ private TransactionFactory.Provider transactionProvider;
private ViewType viewType;
private Short viewIndexId;
private int estimatedSize;
@@ -227,7 +228,7 @@ public class PTableImpl implements PTable {
init(tenantId, this.schemaName, this.tableName, PTableType.INDEX, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
this.schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
null, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
- isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
+ transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
}
public PTableImpl(long timeStamp) { // For delete marker
@@ -270,7 +271,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(),
indexes, table.isImmutableRows(), physicalNames, table.getDefaultFamilyName(), viewStatement,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), updateCacheFrequency,
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), updateCacheFrequency,
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -280,7 +281,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -290,7 +291,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -300,7 +301,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -310,7 +311,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), defaultFamily, table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -320,7 +321,7 @@ public class PTableImpl implements PTable {
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(),
table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -330,18 +331,18 @@ public class PTableImpl implements PTable {
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
- table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(),
+ table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(),
table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
- boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException {
+ boolean isMultitenant, boolean storeNulls, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), transactionProvider, updateCacheFrequency, table.getIndexDisableTimestamp(),
isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -352,7 +353,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -363,7 +364,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(),
+ table.getBaseColumnCount(), rowKeyOrderOptimizable, table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(),
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -374,7 +375,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(),
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(),
table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
}
@@ -383,12 +384,12 @@ public class PTableImpl implements PTable {
Collection<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
- IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
+ IndexType indexType, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency,
long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
- indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional,
+ indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, transactionProvider,
updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
}
@@ -397,7 +398,7 @@ public class PTableImpl implements PTable {
Collection<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
- IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
+ IndexType indexType, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency,
int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped,
String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme,
QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization)
@@ -405,7 +406,7 @@ public class PTableImpl implements PTable {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
- indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency,
+ indexType, baseColumnCount, rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency,
indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
}
@@ -414,13 +415,13 @@ public class PTableImpl implements PTable {
PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
- int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
+ int baseColumnCount, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency,
long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme,
QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException {
init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
parentSchemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
- isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme,
+ transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme,
qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
}
@@ -454,7 +455,7 @@ public class PTableImpl implements PTable {
PName pkName, Integer bucketNum, Collection<PColumn> columns, PName parentSchemaName, PName parentTableName,
List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
- IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp,
+ IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, long indexDisableTimestamp,
boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme,
EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException {
Preconditions.checkNotNull(schemaName);
@@ -486,7 +487,7 @@ public class PTableImpl implements PTable {
this.viewType = viewType;
this.viewIndexId = viewIndexId;
this.indexType = indexType;
- this.isTransactional = isTransactional;
+ this.transactionProvider = transactionProvider;
this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
this.updateCacheFrequency = updateCacheFrequency;
this.isNamespaceMapped = isNamespaceMapped;
@@ -1279,7 +1280,13 @@ public class PTableImpl implements PTable {
boolean disableWAL = table.getDisableWAL();
boolean multiTenant = table.getMultiTenant();
boolean storeNulls = table.getStoreNulls();
- boolean isTransactional = table.getTransactional();
+ TransactionFactory.Provider transactionProvider = null;
+ if (table.hasTransactionProvider()) {
+ transactionProvider = TransactionFactory.Provider.fromCode(table.getTransactionProvider());
+ } else if (table.hasTransactional()) {
+ // For backward compatibility prior to transactionProvider field
+ transactionProvider = TransactionFactory.Provider.TEPHRA;
+ }
ViewType viewType = null;
String viewStatement = null;
List<PName> physicalNames = Collections.emptyList();
@@ -1352,7 +1359,7 @@ public class PTableImpl implements PTable {
(bucketNum == NO_SALTING) ? null : bucketNum, columns, parentSchemaName, parentTableName, indexes,
isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
- isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName,
+ transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName,
isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedColumnQualifierCounter, useStatsForParallelization);
return result;
} catch (SQLException e) {
@@ -1418,7 +1425,9 @@ public class PTableImpl implements PTable {
builder.setDisableWAL(table.isWALDisabled());
builder.setMultiTenant(table.isMultiTenant());
builder.setStoreNulls(table.getStoreNulls());
- builder.setTransactional(table.isTransactional());
+ if (table.getTransactionProvider() != null) {
+ builder.setTransactionProvider(table.getTransactionProvider().getCode());
+ }
if(table.getType() == PTableType.VIEW){
builder.setViewType(ByteStringer.wrap(new byte[]{table.getViewType().getSerializedValue()}));
}
@@ -1473,8 +1482,13 @@ public class PTableImpl implements PTable {
}
@Override
- public boolean isTransactional() {
- return isTransactional;
+ public TransactionFactory.Provider getTransactionProvider() {
+ return transactionProvider;
+ }
+
+ @Override
+ public final boolean isTransactional() {
+ return transactionProvider != null;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index c500b2e..78b9beb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.SchemaUtil;
public enum TableProperty {
@@ -94,6 +95,23 @@ public enum TableProperty {
}
},
+ TRANSACTION_PROVIDER(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+ @Override
+ public Object getPTableValue(PTable table) {
+ return table.getTransactionProvider();
+ }
+ @Override
+ public Object getValue(Object value) {
+ try {
+ return value == null ? null : TransactionFactory.Provider.valueOf(value.toString());
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(new SQLExceptionInfo.Builder(SQLExceptionCode.UNKNOWN_TRANSACTION_PROVIDER)
+ .setMessage(value.toString())
+ .build().buildException());
+ }
+ }
+ },
+
UPDATE_CACHE_FREQUENCY(PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY, true, true, true) {
@Override
public Object getValue(Object value) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 110868e..543eda1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -17,17 +17,11 @@
*/
package org.apache.phoenix.transaction;
-import java.io.IOException;
import java.sql.SQLException;
-import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
public class OmidTransactionContext implements PhoenixTransactionContext {
@@ -56,7 +50,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
}
@Override
- public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException {
+ public void commitDDLFence(PTable dataTable) throws SQLException {
// TODO Auto-generated method stub
}
@@ -116,59 +110,24 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
}
@Override
- public long getMaxTransactionsPerSecond() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public boolean isPreExistingVersion(long version) {
- // TODO Auto-generated method stub
- return false;
+ public Provider getProvider() {
+ return Provider.OMID;
}
@Override
- public BaseRegionObserver getCoprocessor() {
- // TODO Auto-generated method stub
+ public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask) {
return null;
}
@Override
- public void setInMemoryTransactionClient(Configuration config) {
+ public void markDMLFence(PTable dataTable) {
// TODO Auto-generated method stub
}
@Override
- public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props,
- ConnectionInfo connectionInfo) {
- // TODO Auto-generated method stub
-
- return null;
-
- }
-
- @Override
- public byte[] getFamilyDeleteMarker() {
+ public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) {
// TODO Auto-generated method stub
return null;
}
-
- @Override
- public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setupTxManager(Configuration config, String url) throws SQLException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void tearDownTxManager() {
- // TODO Auto-generated method stub
-
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index b0c1bfe..c211661 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -19,34 +19,28 @@ package org.apache.phoenix.transaction;
import java.io.IOException;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
-public class OmidTransactionProvider implements TransactionProvider {
+public class OmidTransactionProvider implements PhoenixTransactionProvider {
private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
-
+
public static final OmidTransactionProvider getInstance() {
return INSTANCE;
}
-
+
private OmidTransactionProvider() {
}
-
- @Override
- public PhoenixTransactionContext getTransactionContext() {
- return new OmidTransactionContext();
- }
@Override
public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
//return new OmidTransactionContext(txnBytes);
return null;
}
-
+
@Override
public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
//return new OmidTransactionContext(connection);
@@ -54,25 +48,37 @@ public class OmidTransactionProvider implements TransactionProvider {
}
@Override
- public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
- //return new OmidTransactionContext(contex, connection, subTask);
+ public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) {
+ // TODO Auto-generated method stub
return null;
}
@Override
- public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
- //return new OmidTransactionTable(ctx, htable);
+ public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo) {
+ // TODO Auto-generated method stub
return null;
}
-
+
@Override
- public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
- return CellUtil.createCell(row, family, HConstants.EMPTY_BYTE_ARRAY, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ public Class<? extends RegionObserver> getCoprocessor() {
+ // TODO Auto-generated method stub
+ return null;
}
-
+
@Override
- public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
- return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ public Provider getProvider() {
+ return TransactionFactory.Provider.OMID;
}
+ @Override
+ public boolean isUnsupported(Feature feature) {
+ // FIXME: if we initialize a Set with the unsupported features
+ // and check for containment, we run into a test failure
+ // in SetPropertyOnEncodedTableIT.testSpecifyingColumnFamilyForTTLFails()
+ // due to TableProperty.colFamSpecifiedException being null
+ // (though it's set in the constructor). I suspect some
+ // mysterious class loader issue. The below works fine
+ // as a workaround.
+ return (feature == Feature.ALTER_NONTX_TO_TX);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
deleted file mode 100644
index 0662555..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
-public class OmidTransactionTable implements PhoenixTransactionalTable {
-
- public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public Result get(Get get) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void put(Put put) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void delete(Delete delete) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public ResultScanner getScanner(Scan scan) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public byte[] getTableName() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Configuration getConfiguration() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public HTableDescriptor getTableDescriptor() throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public boolean exists(Get get) throws IOException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public Result[] get(List<Get> gets) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public ResultScanner getScanner(byte[] family) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public ResultScanner getScanner(byte[] family, byte[] qualifier)
- throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void put(List<Put> puts) throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void delete(List<Delete> deletes) throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush) {
- // TODO Auto-generated method stub
- }
-
- @Override
- public boolean isAutoFlush() {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public long getWriteBufferSize() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public void setWriteBufferSize(long writeBufferSize) throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void flushCommits() throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void close() throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family,
- byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- // TODO Auto-generated method stub
- }
-
- @Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public TableName getName() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public boolean[] existsAll(List<Get> gets) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void batch(List<? extends Row> actions, Object[] results)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public Object[] batch(List<? extends Row> actions) throws IOException,
- InterruptedException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public <R> void batchCallback(List<? extends Row> actions,
- Object[] results, Callback<R> callback) throws IOException,
- InterruptedException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public <R> Object[] batchCallback(List<? extends Row> actions,
- Callback<R> callback) throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Put put) throws IOException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, Put put) throws IOException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Delete delete) throws IOException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, Delete delete)
- throws IOException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public void mutateRow(RowMutations rm) throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public Result append(Append append) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Result increment(Increment increment) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family,
- byte[] qualifier, long amount) throws IOException {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family,
- byte[] qualifier, long amount, Durability durability)
- throws IOException {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public CoprocessorRpcChannel coprocessorService(byte[] row) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public <T extends Service, R> Map<byte[], R> coprocessorService(
- Class<T> service, byte[] startKey, byte[] endKey,
- Call<T, R> callable) throws ServiceException, Throwable {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public <T extends Service, R> void coprocessorService(Class<T> service,
- byte[] startKey, byte[] endKey, Call<T, R> callable,
- Callback<R> callback) throws ServiceException, Throwable {
- // TODO Auto-generated method stub
- }
-
- @Override
- public <R extends Message> Map<byte[], R> batchCoprocessorService(
- MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype)
- throws ServiceException, Throwable {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public <R extends Message> void batchCoprocessorService(
- MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype,
- Callback<R> callback) throws ServiceException, Throwable {
- // TODO Auto-generated method stub
- }
-
- @Override
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, RowMutations mutation)
- throws IOException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public int getOperationTimeout() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public int getRpcTimeout() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public void setOperationTimeout(int arg0) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setRpcTimeout(int arg0) {
- // TODO Auto-generated method stub
-
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java
new file mode 100644
index 0000000..f12f818
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.Closeable;
+
+public interface PhoenixTransactionClient extends Closeable {
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 52ff2f9..f3ad42f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -17,20 +17,98 @@
*/
package org.apache.phoenix.transaction;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
-
-import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
-public interface PhoenixTransactionContext {
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
+public interface PhoenixTransactionContext {
+ public static PhoenixTransactionContext NULL_CONTEXT = new PhoenixTransactionContext() {
+
+ @Override
+ public void begin() throws SQLException {
+ }
+
+ @Override
+ public void commit() throws SQLException {
+ }
+
+ @Override
+ public void abort() throws SQLException {
+ }
+
+ @Override
+ public void checkpoint(boolean hasUncommittedData) throws SQLException {
+ }
+
+ @Override
+ public void commitDDLFence(PTable dataTable) throws SQLException {
+ }
+
+ @Override
+ public void join(PhoenixTransactionContext ctx) {
+ }
+
+ @Override
+ public boolean isTransactionRunning() {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public long getTransactionId() {
+ return 0;
+ }
+
+ @Override
+ public long getReadPointer() {
+ return 0;
+ }
+
+ @Override
+ public long getWritePointer() {
+ return 0;
+ }
+
+ @Override
+ public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+ }
+
+ @Override
+ public PhoenixVisibilityLevel getVisibilityLevel() {
+ return null;
+ }
+
+ @Override
+ public byte[] encodeTransaction() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Provider getProvider() {
+ return null;
+ }
+
+ @Override
+ public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask) {
+ return NULL_CONTEXT;
+ }
+
+ @Override
+ public void markDMLFence(PTable dataTable) {
+
+ }
+
+ @Override
+ public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) {
+ return null;
+ }
+ };
/**
*
* Visibility levels needed for checkpointing and
@@ -49,22 +127,6 @@ public interface PhoenixTransactionContext {
public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing";
/**
- * Set the in memory client connection to the transaction manager (for testing purpose)
- *
- * @param config
- */
- public void setInMemoryTransactionClient(Configuration config);
-
- /**
- * Set the client connection to the transaction manager
- *
- * @param config
- * @param props
- * @param connectionInfo
- */
- public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo);
-
- /**
* Starts a transaction
*
* @throws SQLException
@@ -84,7 +146,7 @@ public interface PhoenixTransactionContext {
* @throws SQLException
*/
public void abort() throws SQLException;
-
+
/**
* Create a checkpoint in a transaction as defined in [TEPHRA-96]
* @throws SQLException
@@ -100,9 +162,17 @@ public interface PhoenixTransactionContext {
* @throws InterruptedException
* @throws TimeoutException
*/
- public void commitDDLFence(PTable dataTable, Logger logger)
+ public void commitDDLFence(PTable dataTable)
throws SQLException;
+
+ /**
+ * Mark the start of DML go ensure that updates to indexed rows are not
+ * missed.
+ * @param dataTable the table on which DML command is working
+ */
+ public void markDMLFence(PTable dataTable);
+
/**
* Augment the current context with ctx modified keys
*
@@ -121,7 +191,8 @@ public interface PhoenixTransactionContext {
public void reset();
/**
- * Returns transaction unique identifier
+ * Returns transaction unique identifier which is also
+ * assumed to be the earliest write pointer.
*/
public long getTransactionId();
@@ -150,42 +221,8 @@ public interface PhoenixTransactionContext {
*/
public byte[] encodeTransaction() throws SQLException;
- /**
- *
- * @return max transactions per second
- */
- public long getMaxTransactionsPerSecond();
+ public Provider getProvider();
+ public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask);
- /**
- *
- * @param version
- */
- public boolean isPreExistingVersion(long version);
-
- /**
- *
- * @return the coprocessor
- */
- public BaseRegionObserver getCoprocessor();
-
- /**
- *
- * @return the family delete marker
- */
- public byte[] getFamilyDeleteMarker();
-
- /**
- * Setup transaction manager's configuration for testing
- */
- public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException;
-
- /**
- * Setup transaction manager for testing
- */
- public void setupTxManager(Configuration config, String url) throws SQLException;
-
- /**
- * Tear down transaction manager for testing
- */
- public void tearDownTxManager();
+ public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
new file mode 100644
index 0000000..cdc6058
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+
+public interface PhoenixTransactionProvider {
+ public enum Feature {
+ ALTER_NONTX_TO_TX(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL);
+
+ private final SQLExceptionCode code;
+
+ Feature(SQLExceptionCode code) {
+ this.code = code;
+ }
+
+ public SQLExceptionCode getCode() {
+ return code;
+ }
+ }
+ public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException;
+ public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection);
+
+ public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo);
+ public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo);
+ public Class<? extends RegionObserver> getCoprocessor();
+
+ public TransactionFactory.Provider getProvider();
+ public boolean isUnsupported(Feature feature);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java
new file mode 100644
index 0000000..10c46e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.Closeable;
+
+public interface PhoenixTransactionService extends Closeable {
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
deleted file mode 100644
index 7af1c08..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-
-import java.io.IOException;
-import java.util.List;
-
-public interface PhoenixTransactionalTable extends HTableInterface {
-
- /**
- * Transaction version of {@link HTableInterface#get(Get get)}
- * @param get
- * @throws IOException
- */
- public Result get(Get get) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#put(Put put)}
- * @param put
- * @throws IOException
- */
- public void put(Put put) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#delete(Delete delete)}
- *
- * @param delete
- * @throws IOException
- */
- public void delete(Delete delete) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#getScanner(Scan scan)}
- *
- * @param scan
- * @return ResultScanner
- * @throws IOException
- */
- public ResultScanner getScanner(Scan scan) throws IOException;
-
- /**
- * Returns Htable name
- */
- public byte[] getTableName();
-
- /**
- * Returns Htable configuration object
- */
- public Configuration getConfiguration();
-
- /**
- * Returns HTableDescriptor of Htable
- * @throws IOException
- */
- public HTableDescriptor getTableDescriptor() throws IOException;
-
- /**
- * Checks if cell exists
- * @throws IOException
- */
- public boolean exists(Get get) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#get(List gets)}
- * @throws IOException
- */
- public Result[] get(List<Get> gets) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#getScanner(byte[] family)}
- * @throws IOException
- */
- public ResultScanner getScanner(byte[] family) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#getScanner(byte[] family, byte[] qualifier)}
- * @throws IOException
- */
- public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#put(List puts)}
- * @throws IOException
- */
- public void put(List<Put> puts) throws IOException;
-
- /**
- * Transactional version of {@link HTableInterface#delete(List deletes)}
- * @throws IOException
- */
- public void delete(List<Delete> deletes) throws IOException;
-
- /**
- * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
- */
- public void setAutoFlush(boolean autoFlush);
-
- /**
- * Delegates to {@link HTable#isAutoFlush()}
- */
- public boolean isAutoFlush();
-
- /**
- * Delegates to see HTable.getWriteBufferSize()
- */
- public long getWriteBufferSize();
-
- /**
- * Delegates to see HTable.setWriteBufferSize()
- */
- public void setWriteBufferSize(long writeBufferSize) throws IOException;
-
- /**
- * Delegates to see HTable.flushCommits()
- */
- public void flushCommits() throws IOException;
-
- /**
- * Releases resources
- * @throws IOException
- */
- public void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 77c3ab6..8b16210 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -19,71 +19,46 @@ package org.apache.phoenix.transaction;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient;
import org.apache.tephra.Transaction;
+import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TxConstants;
-import org.apache.tephra.distributed.PooledClientProvider;
-import org.apache.tephra.distributed.TransactionServiceClient;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
-import org.apache.tephra.util.TxUtils;
+import org.apache.tephra.hbase.TransactionAwareHTable;
import org.apache.tephra.visibility.FenceWait;
import org.apache.tephra.visibility.VisibilityFence;
-import org.apache.tephra.zookeeper.TephraZKClientService;
-import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.inject.util.Providers;
-
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class TephraTransactionContext implements PhoenixTransactionContext {
+import com.google.common.collect.Lists;
+public class TephraTransactionContext implements PhoenixTransactionContext {
+ private static final Logger logger = LoggerFactory.getLogger(TephraTransactionContext.class);
private static final TransactionCodec CODEC = new TransactionCodec();
- private static TransactionSystemClient txClient = null;
- private static ZKClientService zkClient = null;
- private static TransactionService txService = null;
- private static TransactionManager txManager = null;
-
private final List<TransactionAware> txAwares;
private final TransactionContext txContext;
private Transaction tx;
private TransactionSystemClient txServiceClient;
- private TransactionFailureException e;
public TephraTransactionContext() {
this.txServiceClient = null;
@@ -93,21 +68,21 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
public TephraTransactionContext(byte[] txnBytes) throws IOException {
this();
- this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC
- .decode(txnBytes) : null;
+ this.tx = CODEC.decode(txnBytes);
}
public TephraTransactionContext(PhoenixConnection connection) {
- this.txServiceClient = txClient;
+ PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider());
+ assert (client instanceof TephraTransactionClient);
+ this.txServiceClient = ((TephraTransactionClient)client).getTransactionClient();
this.txAwares = Collections.emptyList();
this.txContext = new TransactionContext(txServiceClient);
}
- public TephraTransactionContext(PhoenixTransactionContext ctx,
- PhoenixConnection connection, boolean subTask) {
- this.txServiceClient = txClient;
+ private TephraTransactionContext(PhoenixTransactionContext ctx, boolean subTask) {
assert (ctx instanceof TephraTransactionContext);
TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
+ this.txServiceClient = tephraTransactionContext.txServiceClient;
if (subTask) {
this.tx = tephraTransactionContext.getTransaction();
@@ -117,42 +92,13 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
this.txAwares = Collections.emptyList();
this.txContext = tephraTransactionContext.getContext();
}
-
- this.e = null;
- }
-
- @Override
- public void setInMemoryTransactionClient(Configuration config) {
- TransactionManager txnManager = new TransactionManager(config);
- txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager);
}
@Override
- public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) {
- String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
- if (zkQuorumServersString==null) {
- zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
- }
-
- int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
- // Create instance of the tephra zookeeper client
- ZKClientService txZKClientService = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- new TephraZKClientService(zkQuorumServersString, timeOut, null,
- ArrayListMultimap.<String, byte[]>create()),
- RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
- )
- );
- txZKClientService.startAndWait();
- ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
- PooledClientProvider pooledClientProvider = new PooledClientProvider(
- config, zkDiscoveryService);
- txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
-
- return txZKClientService;
+ public TransactionFactory.Provider getProvider() {
+ return TransactionFactory.Provider.TEPHRA;
}
-
+
@Override
public void begin() throws SQLException {
if (txContext == null) {
@@ -181,8 +127,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
try {
txContext.finish();
} catch (TransactionFailureException e) {
- this.e = e;
-
if (e instanceof TransactionConflictException) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
@@ -204,14 +148,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
}
try {
- if (e != null) {
- txContext.abort(e);
- e = null;
- } else {
- txContext.abort();
- }
+ txContext.abort();
} catch (TransactionFailureException e) {
- this.e = null;
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
@@ -249,7 +187,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
}
@Override
- public void commitDDLFence(PTable dataTable, Logger logger)
+ public void commitDDLFence(PTable dataTable)
throws SQLException {
byte[] key = dataTable.getName().getBytes();
@@ -276,7 +214,12 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
}
}
+ @Override
public void markDMLFence(PTable table) {
+ if (table.getType() == PTableType.INDEX) {
+ return;
+ }
+
byte[] logicalKey = table.getName().getBytes();
TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
@@ -300,6 +243,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
@Override
public void join(PhoenixTransactionContext ctx) {
+ if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
+ return;
+ }
assert (ctx instanceof TephraTransactionContext);
TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
@@ -325,7 +271,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
public void reset() {
tx = null;
txAwares.clear();
- this.e = null;
}
@Override
@@ -406,86 +351,15 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
assert (tx != null);
try {
- return CODEC.encode(tx);
+ byte[] encodedTxBytes = CODEC.encode(tx);
+ encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1);
+ encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode();
+ return encodedTxBytes;
} catch (IOException e) {
throw new SQLException(e);
}
}
- @Override
- public long getMaxTransactionsPerSecond() {
- return TxConstants.MAX_TX_PER_MS;
- }
-
- @Override
- public boolean isPreExistingVersion(long version) {
- return TxUtils.isPreExistingVersion(version);
- }
-
- @Override
- public BaseRegionObserver getCoprocessor() {
- return new TransactionProcessor();
- }
-
- @Override
- public byte[] getFamilyDeleteMarker() {
- return TxConstants.FAMILY_DELETE_QUALIFIER;
- }
-
- @Override
- public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
- config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
- config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
- config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
- config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
- config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder);
- config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds);
- config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
- config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
- }
-
- @Override
- public void setupTxManager(Configuration config, String url) throws SQLException {
-
- if (txService != null) {
- return;
- }
-
- ConnectionInfo connInfo = ConnectionInfo.create(url);
- zkClient = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
- .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
- HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
- .build(),
- RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
- )
- )
- );
- zkClient.startAndWait();
-
- DiscoveryService discovery = new ZKDiscoveryService(zkClient);
- txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
- txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
- txService.startAndWait();
- }
-
- @Override
- public void tearDownTxManager() {
- try {
- if (txService != null) txService.stopAndWait();
- } finally {
- try {
- if (zkClient != null) zkClient.stopAndWait();
- } finally {
- txService = null;
- zkClient = null;
- txManager = null;
- }
- }
- }
-
/**
* TephraTransactionContext specific functions
*/
@@ -511,4 +385,17 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
txAware.startTx(tx);
}
}
+
+ @Override
+ public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
+ return new TephraTransactionContext(context, subTask);
+ }
+
+ @Override
+ public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) {
+ TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable, isImmutable ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+ this.addTransactionAware(transactionAwareHTable);
+ return transactionAwareHTable;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
index 795be9f..2e52efa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
@@ -18,16 +18,37 @@
package org.apache.phoenix.transaction;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
-public class TephraTransactionProvider implements TransactionProvider {
+import com.google.common.collect.ArrayListMultimap;
+import com.google.inject.util.Providers;
+
+public class TephraTransactionProvider implements PhoenixTransactionProvider {
private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider();
public static final TephraTransactionProvider getInstance() {
@@ -37,12 +58,6 @@ public class TephraTransactionProvider implements TransactionProvider {
private TephraTransactionProvider() {
}
-
- @Override
- public PhoenixTransactionContext getTransactionContext() {
- return new TephraTransactionContext();
- }
-
@Override
public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
return new TephraTransactionContext(txnBytes);
@@ -54,23 +69,129 @@ public class TephraTransactionProvider implements TransactionProvider {
}
@Override
- public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
- return new TephraTransactionContext(contex, connection, subTask);
+ public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) {
+ if (connectionInfo.isConnectionless()) {
+ TransactionManager txnManager = new TransactionManager(config);
+ TransactionSystemClient txClient = new InMemoryTxSystemClient(txnManager);
+ return new TephraTransactionClient(txClient);
+
+ }
+ String zkQuorumServersString = config.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
+ if (zkQuorumServersString==null) {
+ zkQuorumServersString = connectionInfo.getZookeeperConnectionString();
+ }
+
+ int timeOut = config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+ // Create instance of the tephra zookeeper client
+ ZKClientService zkClientService = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ new TephraZKClientService(zkQuorumServersString, timeOut, null,
+ ArrayListMultimap.<String, byte[]>create()),
+ RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+ )
+ );
+ //txZKClientService.startAndWait();
+ ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+ PooledClientProvider pooledClientProvider = new PooledClientProvider(
+ config, zkDiscoveryService);
+ TransactionServiceClient txClient = new TransactionServiceClient(config,pooledClientProvider);
+ TephraTransactionClient client = new TephraTransactionClient(zkClientService, txClient);
+ client.start();
+
+ return client;
}
@Override
- public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
- return new TephraTransactionTable(ctx, htable);
+ public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo) {
+ ZKClientService zkClient = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+ .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+ HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+ .build(),
+ RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+ )
+ )
+ );
+
+ //zkClient.startAndWait();
+ DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+ TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
+ TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
+ TephraTransactionService service = new TephraTransactionService(zkClient, txService);
+ //txService.startAndWait();
+ service.start();
+ return service;
+ }
+
+ static class TephraTransactionService implements PhoenixTransactionService {
+ private final ZKClientService zkClient;
+ private final TransactionService txService;
+
+ public TephraTransactionService(ZKClientService zkClient, TransactionService txService) {
+ this.zkClient = zkClient;
+ this.txService = txService;
+ }
+
+ public void start() {
+ zkClient.startAndWait();
+ txService.startAndWait();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (txService != null) txService.stopAndWait();
+ } finally {
+ if (zkClient != null) zkClient.stopAndWait();
+ }
+ }
+
}
- @Override
- public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
- return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ static class TephraTransactionClient implements PhoenixTransactionClient {
+ private final ZKClientService zkClient;
+ private final TransactionSystemClient txClient;
+
+ public TephraTransactionClient(TransactionSystemClient txClient) {
+ this(null, txClient);
+ }
+
+ public TephraTransactionClient(ZKClientService zkClient, TransactionSystemClient txClient) {
+ this.zkClient = zkClient;
+ this.txClient = txClient;
+ }
+
+ public void start() {
+ zkClient.startAndWait();
+ }
+
+ public TransactionSystemClient getTransactionClient() {
+ return txClient;
+ }
+
+ @Override
+ public void close() throws IOException {
+ zkClient.stopAndWait();
+ }
+
}
@Override
- public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
- return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ public Class<? extends RegionObserver> getCoprocessor() {
+ return TephraTransactionalProcessor.class;
+ }
+
+ @Override
+ public Provider getProvider() {
+ return TransactionFactory.Provider.TEPHRA;
+ }
+
+ @Override
+ public boolean isUnsupported(Feature feature) {
+ return false;
}
}