You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/13 22:40:03 UTC
[14/20] phoenix git commit: PHOENIX-4605 Support running multiple
transaction providers
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index c5065e0..59c10ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -19,120 +19,7 @@ package org.apache.phoenix.expression;
import java.util.Map;
-import org.apache.phoenix.expression.function.AbsFunction;
-import org.apache.phoenix.expression.function.ArrayAllComparisonExpression;
-import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression;
-import org.apache.phoenix.expression.function.ArrayAppendFunction;
-import org.apache.phoenix.expression.function.ArrayConcatFunction;
-import org.apache.phoenix.expression.function.ArrayElemRefExpression;
-import org.apache.phoenix.expression.function.ArrayFillFunction;
-import org.apache.phoenix.expression.function.ArrayIndexFunction;
-import org.apache.phoenix.expression.function.ArrayLengthFunction;
-import org.apache.phoenix.expression.function.ArrayPrependFunction;
-import org.apache.phoenix.expression.function.ArrayRemoveFunction;
-import org.apache.phoenix.expression.function.ArrayToStringFunction;
-import org.apache.phoenix.expression.function.ByteBasedRegexpReplaceFunction;
-import org.apache.phoenix.expression.function.ByteBasedRegexpSplitFunction;
-import org.apache.phoenix.expression.function.ByteBasedRegexpSubstrFunction;
-import org.apache.phoenix.expression.function.CbrtFunction;
-import org.apache.phoenix.expression.function.CeilDateExpression;
-import org.apache.phoenix.expression.function.CeilDecimalExpression;
-import org.apache.phoenix.expression.function.CeilFunction;
-import org.apache.phoenix.expression.function.CeilMonthExpression;
-import org.apache.phoenix.expression.function.CeilTimestampExpression;
-import org.apache.phoenix.expression.function.CeilWeekExpression;
-import org.apache.phoenix.expression.function.CeilYearExpression;
-import org.apache.phoenix.expression.function.CoalesceFunction;
-import org.apache.phoenix.expression.function.CollationKeyFunction;
-import org.apache.phoenix.expression.function.ConvertTimezoneFunction;
-import org.apache.phoenix.expression.function.CountAggregateFunction;
-import org.apache.phoenix.expression.function.DayOfMonthFunction;
-import org.apache.phoenix.expression.function.DayOfWeekFunction;
-import org.apache.phoenix.expression.function.DayOfYearFunction;
-import org.apache.phoenix.expression.function.DecodeFunction;
-import org.apache.phoenix.expression.function.DefaultValueExpression;
-import org.apache.phoenix.expression.function.DistinctCountAggregateFunction;
-import org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction;
-import org.apache.phoenix.expression.function.DistinctValueAggregateFunction;
-import org.apache.phoenix.expression.function.EncodeFunction;
-import org.apache.phoenix.expression.function.ExpFunction;
-import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
-import org.apache.phoenix.expression.function.FirstValueFunction;
-import org.apache.phoenix.expression.function.FirstValuesFunction;
-import org.apache.phoenix.expression.function.FloorDateExpression;
-import org.apache.phoenix.expression.function.FloorDecimalExpression;
-import org.apache.phoenix.expression.function.FloorFunction;
-import org.apache.phoenix.expression.function.FloorMonthExpression;
-import org.apache.phoenix.expression.function.FloorWeekExpression;
-import org.apache.phoenix.expression.function.FloorYearExpression;
-import org.apache.phoenix.expression.function.GetBitFunction;
-import org.apache.phoenix.expression.function.GetByteFunction;
-import org.apache.phoenix.expression.function.HourFunction;
-import org.apache.phoenix.expression.function.IndexStateNameFunction;
-import org.apache.phoenix.expression.function.InstrFunction;
-import org.apache.phoenix.expression.function.InvertFunction;
-import org.apache.phoenix.expression.function.LTrimFunction;
-import org.apache.phoenix.expression.function.LastValueFunction;
-import org.apache.phoenix.expression.function.LastValuesFunction;
-import org.apache.phoenix.expression.function.LengthFunction;
-import org.apache.phoenix.expression.function.LnFunction;
-import org.apache.phoenix.expression.function.LogFunction;
-import org.apache.phoenix.expression.function.LowerFunction;
-import org.apache.phoenix.expression.function.LpadFunction;
-import org.apache.phoenix.expression.function.MD5Function;
-import org.apache.phoenix.expression.function.MaxAggregateFunction;
-import org.apache.phoenix.expression.function.MinAggregateFunction;
-import org.apache.phoenix.expression.function.MinuteFunction;
-import org.apache.phoenix.expression.function.MonthFunction;
-import org.apache.phoenix.expression.function.NowFunction;
-import org.apache.phoenix.expression.function.NthValueFunction;
-import org.apache.phoenix.expression.function.OctetLengthFunction;
-import org.apache.phoenix.expression.function.PercentRankAggregateFunction;
-import org.apache.phoenix.expression.function.PercentileContAggregateFunction;
-import org.apache.phoenix.expression.function.PercentileDiscAggregateFunction;
-import org.apache.phoenix.expression.function.PowerFunction;
-import org.apache.phoenix.expression.function.RTrimFunction;
-import org.apache.phoenix.expression.function.RandomFunction;
-import org.apache.phoenix.expression.function.RegexpReplaceFunction;
-import org.apache.phoenix.expression.function.RegexpSplitFunction;
-import org.apache.phoenix.expression.function.RegexpSubstrFunction;
-import org.apache.phoenix.expression.function.ReverseFunction;
-import org.apache.phoenix.expression.function.RoundDateExpression;
-import org.apache.phoenix.expression.function.RoundDecimalExpression;
-import org.apache.phoenix.expression.function.RoundFunction;
-import org.apache.phoenix.expression.function.RoundMonthExpression;
-import org.apache.phoenix.expression.function.RoundTimestampExpression;
-import org.apache.phoenix.expression.function.RoundWeekExpression;
-import org.apache.phoenix.expression.function.RoundYearExpression;
-import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
-import org.apache.phoenix.expression.function.SQLTableTypeFunction;
-import org.apache.phoenix.expression.function.SQLViewTypeFunction;
-import org.apache.phoenix.expression.function.SecondFunction;
-import org.apache.phoenix.expression.function.SetBitFunction;
-import org.apache.phoenix.expression.function.SetByteFunction;
-import org.apache.phoenix.expression.function.SignFunction;
-import org.apache.phoenix.expression.function.SqlTypeNameFunction;
-import org.apache.phoenix.expression.function.SqrtFunction;
-import org.apache.phoenix.expression.function.StddevPopFunction;
-import org.apache.phoenix.expression.function.StddevSampFunction;
-import org.apache.phoenix.expression.function.StringBasedRegexpReplaceFunction;
-import org.apache.phoenix.expression.function.StringBasedRegexpSplitFunction;
-import org.apache.phoenix.expression.function.StringBasedRegexpSubstrFunction;
-import org.apache.phoenix.expression.function.StringToArrayFunction;
-import org.apache.phoenix.expression.function.SubstrFunction;
-import org.apache.phoenix.expression.function.SumAggregateFunction;
-import org.apache.phoenix.expression.function.TimezoneOffsetFunction;
-import org.apache.phoenix.expression.function.ToCharFunction;
-import org.apache.phoenix.expression.function.ToDateFunction;
-import org.apache.phoenix.expression.function.ToNumberFunction;
-import org.apache.phoenix.expression.function.ToTimeFunction;
-import org.apache.phoenix.expression.function.ToTimestampFunction;
-import org.apache.phoenix.expression.function.TrimFunction;
-import org.apache.phoenix.expression.function.TruncFunction;
-import org.apache.phoenix.expression.function.UDFExpression;
-import org.apache.phoenix.expression.function.UpperFunction;
-import org.apache.phoenix.expression.function.WeekFunction;
-import org.apache.phoenix.expression.function.YearFunction;
+import org.apache.phoenix.expression.function.*;
import com.google.common.collect.Maps;
@@ -298,7 +185,9 @@ public enum ExpressionType {
LastValuesFunction(LastValuesFunction.class),
DistinctCountHyperLogLogAggregateFunction(DistinctCountHyperLogLogAggregateFunction.class),
CollationKeyFunction(CollationKeyFunction.class),
- ArrayRemoveFunction(ArrayRemoveFunction.class);
+ ArrayRemoveFunction(ArrayRemoveFunction.class),
+ TransactionProviderNameFunction(TransactionProviderNameFunction.class),
+ ;
ExpressionType(Class<? extends Expression> clazz) {
this.clazz = clazz;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java
new file mode 100644
index 0000000..0117c1f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PTinyint;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.TransactionFactory;
+
+
+/**
+ *
+ * Function used to get the index state name from the serialized byte value
+ * Usage:
+ * IndexStateName('a')
+ * will return 'ACTIVE'
+ *
+ *
+ * @since 2.1
+ */
+@BuiltInFunction(name=TransactionProviderNameFunction.NAME, args= {
+ @Argument(allowedTypes= PInteger.class)} )
+public class TransactionProviderNameFunction extends ScalarFunction {
+ public static final String NAME = "TransactionProviderName";
+
+ public TransactionProviderNameFunction() {
+ }
+
+ public TransactionProviderNameFunction(List<Expression> children) throws SQLException {
+ super(children);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ Expression child = children.get(0);
+ if (!child.evaluate(tuple, ptr)) {
+ return false;
+ }
+ if (ptr.getLength() == 0) {
+ return true;
+ }
+ int code = PTinyint.INSTANCE.getCodec().decodeByte(ptr, child.getSortOrder());
+ TransactionFactory.Provider provider = TransactionFactory.Provider.fromCode(code);
+ ptr.set(PVarchar.INSTANCE.toBytes(provider.name()));
+ return true;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PVarchar.INSTANCE;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 15d8ac3..2f41dc3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -101,7 +101,6 @@ import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.tuple.BaseTuple;
import org.apache.phoenix.schema.tuple.ValueGetterTuple;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.BitSet;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -109,6 +108,7 @@ import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
import com.google.common.base.Preconditions;
@@ -1068,7 +1068,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
// Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor
- || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionProvider().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
+ || TransactionUtil.isDeleteFamily(kv)) {
nDeleteCF++;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 94fbd0d..778401e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -52,7 +52,7 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer);
final PhoenixTransactionContext txnContext;
try {
- txnContext = txState.length != 0 ? TransactionFactory.getTransactionProvider().getTransactionContext(txState) : null;
+ txnContext = TransactionFactory.getTransactionContext(txState, clientVersion);
} catch (IOException e) {
throw new SQLException(e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index c5233d3..d33e3fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -44,7 +44,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
public static final String INDEX_PROTO_MD = "IdxProtoMD";
public static final String INDEX_UUID = "IdxUUID";
public static final String INDEX_MAINTAINERS = "IndexMaintainers";
- public static final String CLIENT_VERSION = "_ClientVersion";
public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
private byte[] regionStartKey;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
index 5e6f756..949e6ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
public class PhoenixIndexMetaDataBuilder {
@@ -63,9 +64,9 @@ public class PhoenixIndexMetaDataBuilder {
boolean useProto = md != null;
byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto);
- final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionProvider().getTransactionContext(txState);
- byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION);
- final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+ byte[] clientVersionBytes = attributes.get(BaseScannerRegionObserver.CLIENT_VERSION);
+ final int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+ final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionContext(txState, clientVersion);
return new IndexMetaDataCache() {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index ba6a08f..cc7221e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -42,7 +42,6 @@ import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.HashJoinRegionScanner;
-import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -63,6 +62,7 @@ import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -128,12 +128,13 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
if (localIndexBytes == null) {
localIndexBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD);
}
+ int clientVersion = ScanUtil.getClientVersion(scan);
List<IndexMaintainer> indexMaintainers =
localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
indexMaintainer = indexMaintainers.get(0);
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
- tx = MutationState.decodeTransaction(txState);
+ tx = TransactionFactory.getTransactionContext(txState, clientVersion);
}
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 9caf7fb..add0628 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
import org.apache.phoenix.expression.function.SQLTableTypeFunction;
import org.apache.phoenix.expression.function.SQLViewTypeFunction;
import org.apache.phoenix.expression.function.SqlTypeNameFunction;
+import org.apache.phoenix.expression.function.TransactionProviderNameFunction;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.iterate.DelegateResultIterator;
@@ -297,6 +298,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String TRANSACTIONAL = "TRANSACTIONAL";
public static final byte[] TRANSACTIONAL_BYTES = Bytes.toBytes(TRANSACTIONAL);
+ public static final String TRANSACTION_PROVIDER = "TRANSACTION_PROVIDER";
+ public static final byte[] TRANSACTION_PROVIDER_BYTES = Bytes.toBytes(TRANSACTION_PROVIDER);
+
public static final String UPDATE_CACHE_FREQUENCY = "UPDATE_CACHE_FREQUENCY";
public static final byte[] UPDATE_CACHE_FREQUENCY_BYTES = Bytes.toBytes(UPDATE_CACHE_FREQUENCY);
@@ -1133,9 +1137,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
VIEW_STATEMENT + "," +
SQLViewTypeFunction.NAME + "(" + VIEW_TYPE + ") AS " + VIEW_TYPE + "," +
SQLIndexTypeFunction.NAME + "(" + INDEX_TYPE + ") AS " + INDEX_TYPE + "," +
- TRANSACTIONAL + "," +
+ TRANSACTION_PROVIDER + " IS NOT NULL AS " + TRANSACTIONAL + "," +
IS_NAMESPACE_MAPPED + "," +
- GUIDE_POSTS_WIDTH +
+ GUIDE_POSTS_WIDTH + "," +
+ TransactionProviderNameFunction.NAME + "(" + TRANSACTION_PROVIDER + ") AS TRANSACTION_PROVIDER" +
" from " + SYSTEM_CATALOG + " " + SYSTEM_CATALOG_ALIAS +
" where " + COLUMN_NAME + " is null" +
" and " + COLUMN_FAMILY + " is null" +
@@ -1175,7 +1180,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
"'' " + INDEX_TYPE + "," +
"CAST(null AS BOOLEAN) " + TRANSACTIONAL + "," +
"CAST(null AS BOOLEAN) " + IS_NAMESPACE_MAPPED + "," +
- "CAST(null AS BIGINT) " + GUIDE_POSTS_WIDTH + "\n");
+ "CAST(null AS BIGINT) " + GUIDE_POSTS_WIDTH + "," +
+ "CAST(null AS VARCHAR) " + TRANSACTION_PROVIDER + "\n");
buf.append(
" from " + SYSTEM_SEQUENCE + "\n");
StringBuilder whereClause = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f526419..25b9fb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -405,7 +405,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
MutationState state = connection.getMutationState();
MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null && plan.getTargetRef().getTable().isTransactional()) {
- state.startTransaction();
+ state.startTransaction(plan.getTargetRef().getTable().getTransactionProvider());
}
Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
state.sendUncommitted(tableRefs);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index 7c154f0..f4ecac2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -116,7 +116,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
put = new Put(CellUtil.cloneRow(cell));
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
- put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion);
+ put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersion);
put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
mutations.add(put);
}
@@ -126,7 +126,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
del = new Delete(CellUtil.cloneRow(cell));
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
- del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion);
+ del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersion);
del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
mutations.add(del);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 0b72ada..b75119b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -48,6 +48,8 @@ import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.transaction.PhoenixTransactionClient;
+import org.apache.phoenix.transaction.TransactionFactory;
public interface ConnectionQueryServices extends QueryServices, MetaDataMutated {
@@ -152,4 +154,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public User getUser();
public QueryLoggerDisruptor getQueryDisruptor();
-}
\ No newline at end of file
+
+ public PhoenixTransactionClient initTransactionClient(TransactionFactory.Provider provider);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 6627a84..5cb14d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
@@ -138,10 +139,10 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
-import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
import org.apache.phoenix.coprocessor.ScanRegionObserver;
import org.apache.phoenix.coprocessor.SequenceRegionObserver;
import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
@@ -226,8 +227,10 @@ import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PUnsignedTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.PhoenixTransactionClient;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.ConfigUtil;
@@ -264,7 +267,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000;
private static final int TTL_FOR_MUTEX = 15 * 60; // 15min
protected final Configuration config;
- private final ConnectionInfo connectionInfo;
+ protected final ConnectionInfo connectionInfo;
// Copy of config.getProps(), but read-only to prevent synchronization that we
// don't need.
private final ReadOnlyProps props;
@@ -306,6 +309,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// List of queues instead of a single queue to provide reduced contention via lock striping
private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues;
private ScheduledExecutorService renewLeaseExecutor;
+ private PhoenixTransactionClient[] txClients = new PhoenixTransactionClient[TransactionFactory.Provider.values().length];;
/*
* We can have multiple instances of ConnectionQueryServices. By making the thread factory
* static, renew lease thread names will be unique across them.
@@ -410,23 +414,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
- private void initTxServiceClient() {
- txZKClientService = TransactionFactory.getTransactionProvider().getTransactionContext().setTransactionClient(config, props, connectionInfo);
- }
-
private void openConnection() throws SQLException {
try {
- boolean transactionsEnabled = props.getBoolean(
- QueryServices.TRANSACTIONS_ENABLED,
- QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
GLOBAL_HCONNECTIONS_COUNTER.increment();
logger.info("HConnection established. Stacktrace for informational purposes: " + connection + " " + LogUtil.getCallerStackTrace());
- // only initialize the tx service client if needed and if we succeeded in getting a connection
- // to HBase
- if (transactionsEnabled) {
- initTxServiceClient();
- }
} catch (IOException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
.setRootCause(e).build().buildException();
@@ -517,7 +509,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
renewLeaseExecutor.shutdownNow();
}
// shut down the tx client service if we created one to support transactions
- if (this.txZKClientService != null) this.txZKClientService.stopAndWait();
+ for (PhoenixTransactionClient client : txClients) {
+ if (client != null) {
+ client.close();
+ }
+ }
}
} catch (IOException e) {
if (sqlE == null) {
@@ -858,9 +854,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
}
+ // For ALTER TABLE
+ boolean nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
boolean isTransactional =
- Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
- Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); // For ALTER TABLE
+ Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || nonTxToTx;
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
// Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
@@ -923,13 +920,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
if (isTransactional) {
- if (!descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
- descriptor.addCoprocessor(PhoenixTransactionalProcessor.class.getName(), null, priority - 10, null);
+ TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
+ if (provider == null) {
+ String providerValue = this.props.get(QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER);
+ provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(providerValue);
+ }
+ Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
+ if (!descriptor.hasCoprocessor(coprocessorClass.getName())) {
+ descriptor.addCoprocessor(coprocessorClass.getName(), null, priority - 10, null);
}
} else {
- // If exception on alter table to transition back to non transactional
- if (descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
- descriptor.removeCoprocessor(PhoenixTransactionalProcessor.class.getName());
+ // Remove all potential transactional coprocessors
+ for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) {
+ Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
+ if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) {
+ descriptor.removeCoprocessor(coprocessorClass.getName());
+ }
}
}
} catch (IOException e) {
@@ -1126,7 +1132,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} else {
// If we think we're creating a non transactional table when it's already
// transactional, don't allow.
- if (existingDesc.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
+ if (existingDesc.hasCoprocessor(TephraTransactionalProcessor.class.getName())) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName))
.setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException();
@@ -2895,6 +2901,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
+ PBoolean.INSTANCE.getSqlTypeName());
addParentToChildLinks(metaConnection);
}
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) {
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0,
+ PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + " "
+ + PTinyint.INSTANCE.getSqlTypeName());
+ }
}
@@ -4080,7 +4094,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public String getUserName() {
return userName;
}
-
+
@Override
public User getUser() {
return user;
@@ -4515,4 +4529,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public QueryLoggerDisruptor getQueryDisruptor() {
return this.queryDisruptor;
}
+
+ @Override
+ public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) {
+ PhoenixTransactionClient client = txClients[provider.ordinal()];
+ if (client == null) {
+ client = txClients[provider.ordinal()] = provider.getTransactionProvider().getTransactionClient(config, connectionInfo);
+ }
+ return client;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index ad354d1..aa8209d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -82,7 +82,8 @@ import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
-import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.PhoenixTransactionClient;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.MetaDataUtil;
@@ -116,7 +117,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
private final Configuration config;
private User user;
-
+
public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) {
super(services);
userName = connInfo.getPrincipal();
@@ -141,7 +142,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
// Without making a copy of the configuration we cons up, we lose some of our properties
// on the server side during testing.
this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
- TransactionFactory.getTransactionProvider().getTransactionContext().setInMemoryTransactionClient(config);
this.guidePostsCache = new GuidePostsCache(this, config);
}
@@ -682,4 +682,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public QueryLoggerDisruptor getQueryDisruptor() {
return null;
}
+
+ @Override
+ public PhoenixTransactionClient initTransactionClient(Provider provider) {
+ return null; // Client is not necessary
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index f5c8a59..ed9b9da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -49,6 +49,8 @@ import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.transaction.PhoenixTransactionClient;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
public class DelegateConnectionQueryServices extends DelegateQueryServices implements ConnectionQueryServices {
@@ -363,6 +365,11 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public QueryLoggerDisruptor getQueryDisruptor() {
return getDelegate().getQueryDisruptor();
}
-
-
-}
\ No newline at end of file
+
+
+
+ @Override
+ public PhoenixTransactionClient initTransactionClient(Provider provider) {
+ return getDelegate().initTransactionClient(provider);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index ae12e01..d181fc8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -100,6 +100,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
@@ -339,6 +340,7 @@ public interface QueryConstants {
ENCODING_SCHEME + " TINYINT, " +
COLUMN_QUALIFIER_COUNTER + " INTEGER, " +
USE_STATS_FOR_PARALLELIZATION + " BOOLEAN, " +
+ TRANSACTION_PROVIDER + " TINYINT, " +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
+ TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 21f043c..29d18d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -207,6 +207,7 @@ public interface QueryServices extends SQLCloseable {
public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = "phoenix.table.istransactional.default";
+ public static final String DEFAULT_TRANSACTION_PROVIDER_ATTRIB = "phoenix.table.transaction.provider.default";
public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled";
// Transaction related configs
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 58c9812..70ac11b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -114,6 +114,7 @@ import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTableRefFactory;
import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -263,6 +264,7 @@ public class QueryServicesOptions {
// We'll also need this for transactions to work correctly
public static final boolean DEFAULT_AUTO_COMMIT = false;
public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false;
+ public static final String DEFAULT_TRANSACTION_PROVIDER = TransactionFactory.Provider.getDefault().name();
public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false;
public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
@@ -554,22 +556,22 @@ public class QueryServicesOptions {
return set(GROUPBY_SPILL_FILES_ATTRIB, num);
}
- private QueryServicesOptions set(String name, boolean value) {
+ QueryServicesOptions set(String name, boolean value) {
config.set(name, Boolean.toString(value));
return this;
}
- private QueryServicesOptions set(String name, int value) {
+ QueryServicesOptions set(String name, int value) {
config.set(name, Integer.toString(value));
return this;
}
- private QueryServicesOptions set(String name, String value) {
+ QueryServicesOptions set(String name, String value) {
config.set(name, value);
return this;
}
- private QueryServicesOptions set(String name, long value) {
+ QueryServicesOptions set(String name, long value) {
config.set(name, Long.toString(value));
return this;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 8f15c5e..d1b8f1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.transaction.TransactionFactory;
public class DelegateTable implements PTable {
@Override
@@ -237,7 +238,12 @@ public class DelegateTable implements PTable {
}
@Override
- public boolean isTransactional() {
+ public TransactionFactory.Provider getTransactionProvider() {
+ return delegate.getTransactionProvider();
+ }
+
+ @Override
+ public final boolean isTransactional() {
return delegate.isTransactional();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index d252879..1fb668e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -83,6 +83,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION;
@@ -222,6 +223,9 @@ import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.CursorUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -280,7 +284,7 @@ public class MetaDataClient {
INDEX_TYPE + "," +
STORE_NULLS + "," +
BASE_COLUMN_COUNT + "," +
- TRANSACTIONAL + "," +
+ TRANSACTION_PROVIDER + "," +
UPDATE_CACHE_FREQUENCY + "," +
IS_NAMESPACE_MAPPED + "," +
AUTO_PARTITION_SEQ + "," +
@@ -572,14 +576,11 @@ public class MetaDataClient {
} catch (TableNotFoundException e) {
}
- boolean defaultTransactional = connection.getQueryServices().getProps().getBoolean(
- QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB,
- QueryServicesOptions.DEFAULT_TRANSACTIONAL);
// start a txn if all table are transactional by default or if we found the table in the cache and it is transactional
// TODO if system tables become transactional remove the check
- boolean isTransactional = defaultTransactional || (table!=null && table.isTransactional());
- if (!systemTable && isTransactional && !connection.getMutationState().isTransactionStarted()) {
- connection.getMutationState().startTransaction();
+ boolean isTransactional = (table!=null && table.isTransactional());
+ if (isTransactional) {
+ connection.getMutationState().startTransaction(table.getTransactionProvider());
}
resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp;
// Do not make rpc to getTable if
@@ -632,16 +633,20 @@ public class MetaDataClient {
result =
queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp,
resolvedTimestamp);
- // if the table was assumed to be transactional, but is actually not transactional
- // then re-resolve as of the right timestamp (and vice versa)
- if (table == null && result.getTable() != null
- && result.getTable().isTransactional() != isTransactional) {
- result =
- queryServices.getTable(tenantId, schemaBytes, tableBytes,
- tableTimestamp,
- TransactionUtil.getResolvedTimestamp(connection,
- result.getTable().isTransactional(),
- HConstants.LATEST_TIMESTAMP));
+ // if the table was assumed to be non transactional, but is actually transactional
+ // then re-resolve as of the right timestamp
+ if (result.getTable() != null
+ && result.getTable().isTransactional()
+ && !isTransactional) {
+ long resolveTimestamp = TransactionUtil.getResolvedTimestamp(connection,
+ result.getTable().isTransactional(),
+ HConstants.LATEST_TIMESTAMP);
+ // Reresolve if table timestamp is past timestamp as of which we should see data
+ if (result.getTable().getTimeStamp() >= resolveTimestamp) {
+ result =
+ queryServices.getTable(tenantId, schemaBytes, tableBytes,
+ tableTimestamp, resolveTimestamp);
+ }
}
if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
@@ -1236,8 +1241,8 @@ public class MetaDataClient {
//view all the data belonging to the table
PTable nonTxnLogicalTable = new DelegateTable(logicalTable) {
@Override
- public boolean isTransactional() {
- return false;
+ public TransactionFactory.Provider getTransactionProvider() {
+ return null;
}
};
TableRef tableRef = new TableRef(null, nonTxnLogicalTable, clientTimeStamp, false);
@@ -1861,7 +1866,7 @@ public class MetaDataClient {
long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
boolean multiTenant = false;
boolean storeNulls = false;
- boolean transactional = (parent!= null) ? parent.isTransactional() : false;
+ TransactionFactory.Provider transactionProvider = (parent!= null) ? parent.getTransactionProvider() : null;
Integer saltBucketNum = null;
String defaultFamilyName = null;
boolean isImmutableRows = false;
@@ -1877,7 +1882,7 @@ public class MetaDataClient {
QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS;
ImmutableStorageScheme immutableStorageScheme = ONE_CELL_PER_COLUMN;
if (parent != null && tableType == PTableType.INDEX) {
- timestamp = TransactionUtil.getTableTimestamp(connection, transactional);
+ timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
storeNulls = parent.getStoreNulls();
isImmutableRows = parent.isImmutableRows();
isAppendOnlySchema = parent.isAppendOnlySchema();
@@ -2018,31 +2023,45 @@ public class MetaDataClient {
storeNulls = storeNullsProp;
}
Boolean transactionalProp = (Boolean) TableProperty.TRANSACTIONAL.getValue(tableProps);
- if (transactionalProp != null && parent != null) {
+ TransactionFactory.Provider transactionProviderProp = (TransactionFactory.Provider) TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
+ if ((transactionalProp != null || transactionProviderProp != null) && parent != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
if (parent == null) {
- if (transactionalProp == null) {
+ boolean transactional;
+ if (transactionProviderProp != null) {
+ transactional = true;
+ } else if (transactionalProp == null) {
transactional = connection.getQueryServices().getProps().getBoolean(
QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB,
QueryServicesOptions.DEFAULT_TABLE_ISTRANSACTIONAL);
} else {
transactional = transactionalProp;
}
+ if (transactional) {
+ if (transactionProviderProp == null) {
+ transactionProvider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(
+ connection.getQueryServices().getProps().get(
+ QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB,
+ QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER));
+ } else {
+ transactionProvider = transactionProviderProp;
+ }
+ }
}
boolean transactionsEnabled = connection.getQueryServices().getProps().getBoolean(
QueryServices.TRANSACTIONS_ENABLED,
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
// can't create a transactional table if transactions are not enabled
- if (!transactionsEnabled && transactional) {
+ if (!transactionsEnabled && transactionProvider != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_IF_TXNS_DISABLED)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
// can't create a transactional table if it has a row timestamp column
- if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactional) {
+ if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactionProvider != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP)
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
@@ -2050,8 +2069,9 @@ public class MetaDataClient {
// Put potentially inferred value into tableProps as it's used by the createTable call below
// to determine which coprocessors to install on the new table.
- tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, transactional);
- if (transactional) {
+ tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, transactionProvider != null);
+ if (transactionProvider != null) {
+ // TODO: for Omid
// If TTL set, use Tephra TTL property name instead
Object ttl = commonFamilyProps.remove(HColumnDescriptor.TTL);
if (ttl != null) {
@@ -2063,7 +2083,7 @@ public class MetaDataClient {
(Boolean) TableProperty.USE_STATS_FOR_PARALLELIZATION.getValue(tableProps);
boolean sharedTable = statement.getTableType() == PTableType.VIEW || allocateIndexId;
- if (transactional) {
+ if (transactionProvider != null) {
// Tephra uses an empty value cell as its delete marker, so we need to turn on
// storeNulls for transactional tables.
// If we use regular column delete markers (which is what non transactional tables
@@ -2098,7 +2118,7 @@ public class MetaDataClient {
}
}
}
- timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional) : timestamp;
+ timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider) : timestamp;
// Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
if (sharedTable) {
@@ -2481,7 +2501,7 @@ public class MetaDataClient {
Collections.<PTable>emptyList(), isImmutableRows,
Collections.<PName>emptyList(), defaultFamilyName == null ? null :
PNameFactory.newName(defaultFamilyName), null,
- Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true);
+ Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, null, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true);
connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
}
@@ -2621,7 +2641,11 @@ public class MetaDataClient {
} else {
tableUpsert.setInt(19, BASE_TABLE_BASE_COLUMN_COUNT);
}
- tableUpsert.setBoolean(20, transactional);
+ if (transactionProvider == null) {
+ tableUpsert.setNull(20, Types.TINYINT);
+ } else {
+ tableUpsert.setByte(20, transactionProvider.getCode());
+ }
tableUpsert.setLong(21, updateCacheFrequency);
tableUpsert.setBoolean(22, isNamespaceMapped);
if (autoPartitionSeq == null) {
@@ -2746,7 +2770,7 @@ public class MetaDataClient {
PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns.values(),
parent == null ? null : parent.getSchemaName(), parent == null ? null : parent.getTableName(), Collections.<PTable>emptyList(), isImmutableRows,
physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
- result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, immutableStorageScheme, encodingScheme, cqCounterToBe, useStatsForParallelizationProp);
+ result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, immutableStorageScheme, encodingScheme, cqCounterToBe, useStatsForParallelizationProp);
result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
addTableToCache(result);
return table;
@@ -3251,8 +3275,8 @@ public class MetaDataClient {
changingPhoenixTableProperty = evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName);
// If changing isImmutableRows to true or it's not being changed and is already true
boolean willBeImmutableRows = Boolean.TRUE.equals(metaPropertiesEvaluated.getIsImmutableRows()) || (metaPropertiesEvaluated.getIsImmutableRows() == null && table.isImmutableRows());
-
- Long timeStamp = TransactionUtil.getTableTimestamp(connection, table.isTransactional() || metaProperties.getNonTxToTx());
+ boolean willBeTxnl = metaProperties.getNonTxToTx();
+ Long timeStamp = TransactionUtil.getTableTimestamp(connection, table.isTransactional() || willBeTxnl, table.isTransactional() ? table.getTransactionProvider() : metaPropertiesEvaluated.getTransactionProvider());
int numPkColumnsAdded = 0;
List<PColumn> columns = Lists.newArrayListWithExpectedSize(numCols);
Set<String> colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>();
@@ -4247,6 +4271,8 @@ public class MetaDataClient {
metaProperties.setStoreNullsProp((Boolean)value);
} else if (propName.equals(TRANSACTIONAL)) {
metaProperties.setIsTransactionalProp((Boolean)value);
+ } else if (propName.equals(TRANSACTION_PROVIDER)) {
+ metaProperties.setTransactionProviderProp((TransactionFactory.Provider) value);
} else if (propName.equals(UPDATE_CACHE_FREQUENCY)) {
metaProperties.setUpdateCacheFrequencyProp((Long)value);
} else if (propName.equals(GUIDE_POSTS_WIDTH)) {
@@ -4369,6 +4395,22 @@ public class MetaDataClient {
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
+ TransactionFactory.Provider provider = metaProperties.getTransactionProviderProp();
+ if (provider == null) {
+ provider = (Provider)
+ TableProperty.TRANSACTION_PROVIDER.getValue(
+ connection.getQueryServices().getProps().get(
+ QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB,
+ QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER));
+ metaPropertiesEvaluated.setTransactionProvider(provider);
+ }
+ if (provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALTER_NONTX_TO_TX)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL)
+ .setMessage(provider.name() + ". ")
+ .setSchemaName(schemaName)
+ .setTableName(tableName)
+ .build().buildException();
+ }
changingPhoenixTableProperty = true;
metaProperties.setNonTxToTx(true);
}
@@ -4381,6 +4423,7 @@ public class MetaDataClient {
private Boolean multiTenantProp = null;
private Boolean disableWALProp = null;
private Boolean storeNullsProp = null;
+ private TransactionFactory.Provider transactionProviderProp = null;
private Boolean isTransactionalProp = null;
private Long updateCacheFrequencyProp = null;
private Boolean appendOnlySchemaProp = null;
@@ -4421,6 +4464,14 @@ public class MetaDataClient {
this.storeNullsProp = storeNullsProp;
}
+ public TransactionFactory.Provider getTransactionProviderProp() {
+ return transactionProviderProp;
+ }
+
+ public void setTransactionProviderProp(TransactionFactory.Provider transactionProviderProp) {
+ this.transactionProviderProp = transactionProviderProp;
+ }
+
public Boolean getIsTransactionalProp() {
return isTransactionalProp;
}
@@ -4490,6 +4541,7 @@ public class MetaDataClient {
private Boolean storeNulls = null;
private Boolean useStatsForParallelization = null;
private Boolean isTransactional = null;
+ private TransactionFactory.Provider transactionProvider = null;
public Boolean getIsImmutableRows() {
return isImmutableRows;
@@ -4570,5 +4622,14 @@ public class MetaDataClient {
public void setIsTransactional(Boolean isTransactional) {
this.isTransactional = isTransactional;
}
+
+ public TransactionFactory.Provider getTransactionProvider() {
+ return transactionProvider;
+ }
+
+ public void setTransactionProvider(TransactionFactory.Provider transactionProvider) {
+ this.transactionProvider = transactionProvider;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 7e186ad..af78612 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;
import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
import com.google.common.annotations.VisibleForTesting;
@@ -680,6 +681,7 @@ public interface PTable extends PMetaDataEntity {
boolean isMultiTenant();
boolean getStoreNulls();
boolean isTransactional();
+ TransactionFactory.Provider getTransactionProvider();
ViewType getViewType();
String getViewStatement();