You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/13 22:40:01 UTC
[12/20] phoenix git commit: PHOENIX-4605 Support running multiple
transaction providers
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
deleted file mode 100644
index c191d8d..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
-public class TephraTransactionTable implements PhoenixTransactionalTable {
-
- private TransactionAwareHTable transactionAwareHTable;
-
- private TephraTransactionContext tephraTransactionContext;
-
- public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
- this(ctx, hTable, null);
- }
-
- public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, PTable pTable) {
-
- assert(ctx instanceof TephraTransactionContext);
-
- tephraTransactionContext = (TephraTransactionContext) ctx;
-
- transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
-
- tephraTransactionContext.addTransactionAware(transactionAwareHTable);
-
- if (pTable != null && pTable.getType() != PTableType.INDEX) {
- tephraTransactionContext.markDMLFence(pTable);
- }
- }
-
- @Override
- public Result get(Get get) throws IOException {
- return transactionAwareHTable.get(get);
- }
-
- @Override
- public void put(Put put) throws IOException {
- transactionAwareHTable.put(put);
- }
-
- @Override
- public void delete(Delete delete) throws IOException {
- transactionAwareHTable.delete(delete);
- }
-
- @Override
- public ResultScanner getScanner(Scan scan) throws IOException {
- return transactionAwareHTable.getScanner(scan);
- }
-
- @Override
- public byte[] getTableName() {
- return transactionAwareHTable.getTableName();
- }
-
- @Override
- public Configuration getConfiguration() {
- return transactionAwareHTable.getConfiguration();
- }
-
- @Override
- public HTableDescriptor getTableDescriptor() throws IOException {
- return transactionAwareHTable.getTableDescriptor();
- }
-
- @Override
- public boolean exists(Get get) throws IOException {
- return transactionAwareHTable.exists(get);
- }
-
- @Override
- public Result[] get(List<Get> gets) throws IOException {
- return transactionAwareHTable.get(gets);
- }
-
- @Override
- public ResultScanner getScanner(byte[] family) throws IOException {
- return transactionAwareHTable.getScanner(family);
- }
-
- @Override
- public ResultScanner getScanner(byte[] family, byte[] qualifier)
- throws IOException {
- return transactionAwareHTable.getScanner(family, qualifier);
- }
-
- @Override
- public void put(List<Put> puts) throws IOException {
- transactionAwareHTable.put(puts);
- }
-
- @Override
- public void delete(List<Delete> deletes) throws IOException {
- transactionAwareHTable.delete(deletes);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush) {
- transactionAwareHTable.setAutoFlush(autoFlush);
- }
-
- @Override
- public boolean isAutoFlush() {
- return transactionAwareHTable.isAutoFlush();
- }
-
- @Override
- public long getWriteBufferSize() {
- return transactionAwareHTable.getWriteBufferSize();
- }
-
- @Override
- public void setWriteBufferSize(long writeBufferSize) throws IOException {
- transactionAwareHTable.setWriteBufferSize(writeBufferSize);
- }
-
- @Override
- public void flushCommits() throws IOException {
- transactionAwareHTable.flushCommits();
- }
-
- @Override
- public void close() throws IOException {
- transactionAwareHTable.close();
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family,
- byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
- }
-
- @Override
- public Boolean[] exists(List<Get> gets) throws IOException {
- return transactionAwareHTable.exists(gets);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail);
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- transactionAwareHTable.setAutoFlush(autoFlush);
- }
-
- @Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- return transactionAwareHTable.getRowOrBefore(row, family);
- }
-
- @Override
- public TableName getName() {
- return transactionAwareHTable.getName();
- }
-
- @Override
- public boolean[] existsAll(List<Get> gets) throws IOException {
- return transactionAwareHTable.existsAll(gets);
- }
-
- @Override
- public void batch(List<? extends Row> actions, Object[] results)
- throws IOException, InterruptedException {
- transactionAwareHTable.batch(actions, results);
- }
-
- @Override
- public Object[] batch(List<? extends Row> actions) throws IOException,
- InterruptedException {
- return transactionAwareHTable.batch(actions);
- }
-
- @Override
- public <R> void batchCallback(List<? extends Row> actions,
- Object[] results, Callback<R> callback) throws IOException,
- InterruptedException {
- transactionAwareHTable.batchCallback(actions, results, callback);
- }
-
- @Override
- public <R> Object[] batchCallback(List<? extends Row> actions,
- Callback<R> callback) throws IOException, InterruptedException {
- return transactionAwareHTable.batchCallback(actions, callback);
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Put put) throws IOException {
- return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put);
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, Put put) throws IOException {
- return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put);
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Delete delete) throws IOException {
- return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete);
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, Delete delete)
- throws IOException {
- return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete);
- }
-
- @Override
- public void mutateRow(RowMutations rm) throws IOException {
- transactionAwareHTable.mutateRow(rm);
- }
-
- @Override
- public Result append(Append append) throws IOException {
- return transactionAwareHTable.append(append);
- }
-
- @Override
- public Result increment(Increment increment) throws IOException {
- return transactionAwareHTable.increment(increment);
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family,
- byte[] qualifier, long amount) throws IOException {
- return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount);
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family,
- byte[] qualifier, long amount, Durability durability)
- throws IOException {
- return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability);
- }
-
- @Override
- public CoprocessorRpcChannel coprocessorService(byte[] row) {
- return transactionAwareHTable.coprocessorService(row);
- }
-
- @Override
- public <T extends Service, R> Map<byte[], R> coprocessorService(
- Class<T> service, byte[] startKey, byte[] endKey,
- Call<T, R> callable) throws ServiceException, Throwable {
- return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable);
- }
-
- @Override
- public <T extends Service, R> void coprocessorService(Class<T> service,
- byte[] startKey, byte[] endKey, Call<T, R> callable,
- Callback<R> callback) throws ServiceException, Throwable {
- transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback);
- }
-
- @Override
- public <R extends Message> Map<byte[], R> batchCoprocessorService(
- MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype)
- throws ServiceException, Throwable {
- return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
- }
-
- @Override
- public <R extends Message> void batchCoprocessorService(
- MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype,
- Callback<R> callback) throws ServiceException, Throwable {
- transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
- }
-
- @Override
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareOp compareOp, byte[] value, RowMutations mutation)
- throws IOException {
- return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
- }
-
- @Override
- public void setOperationTimeout(int i) {
-// transactionAwareHTable.setOperationTimeout(i);
- }
-
- @Override
- public int getOperationTimeout() {
- return 0; //transactionAwareHTable.getOperationTimeout();
- }
-
- @Override
- public void setRpcTimeout(int i) {
-// transactionAwareHTable.setRpcTimeout(i);
- }
-
- @Override
- public int getRpcTimeout() {
- return 0; //transactionAwareHTable.getRpcTimeout();
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index f32764b..62bd808 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -17,24 +17,55 @@
*/
package org.apache.phoenix.transaction;
+import java.io.IOException;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+
+
public class TransactionFactory {
- enum TransactionProcessor {
- Tephra,
- Omid
+ public enum Provider {
+ TEPHRA((byte)1, TephraTransactionProvider.getInstance()),
+ OMID((byte)2, OmidTransactionProvider.getInstance());
+
+ private final byte code;
+ private final PhoenixTransactionProvider provider;
+
+ Provider(byte code, PhoenixTransactionProvider provider) {
+ this.code = code;
+ this.provider = provider;
+ }
+
+ public byte getCode() {
+ return this.code;
+ }
+
+ public static Provider fromCode(int code) {
+ if (code < 1 || code > Provider.values().length) {
+ throw new IllegalArgumentException("Invalid TransactionFactory.Provider " + code);
+ }
+ return Provider.values()[code-1];
+ }
+
+ public static Provider getDefault() {
+ return TEPHRA;
+ }
+
+ public PhoenixTransactionProvider getTransactionProvider() {
+ return provider;
+ }
}
- static public TransactionProvider getTransactionProvider() {
- return TephraTransactionProvider.getInstance();
+ public static PhoenixTransactionProvider getTransactionProvider(Provider provider) {
+ return provider.getTransactionProvider();
}
- static public TransactionProvider getTransactionProvider(TransactionProcessor processor) {
- switch (processor) {
- case Tephra:
- return TephraTransactionProvider.getInstance();
- case Omid:
- return OmidTransactionProvider.getInstance();
- default:
- throw new IllegalArgumentException("Unknown transaction processor: " + processor);
+ public static PhoenixTransactionContext getTransactionContext(byte[] txState, int clientVersion) throws IOException {
+ if (txState == null || txState.length == 0) {
+ return null;
}
+ Provider provider = (clientVersion < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0)
+ ? Provider.OMID
+ : Provider.fromCode(txState[txState.length-1]);
+ return provider.getTransactionProvider().getTransactionContext(txState);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
deleted file mode 100644
index a5704f1..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-
-public interface TransactionProvider {
- public PhoenixTransactionContext getTransactionContext();
- public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException;
- public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection);
- public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask);
-
- public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable);
-
- public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp);
- public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp);
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 1c25c33..6cf6e56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -88,7 +88,6 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.transaction.TransactionFactory;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -1515,7 +1514,7 @@ public class PhoenixRuntime {
* @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp.
*/
public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) {
- return TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
+ return TransactionUtil.isTransactionalTimestamp(tsOfCell) ? TransactionUtil.convertToMilliseconds(tsOfCell) : tsOfCell;
}
public static long getCurrentScn(ReadOnlyProps props) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index dd885fd..996e1dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.filter.DistinctPrefixFilter;
import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.KeyRange.Bound;
@@ -88,6 +89,8 @@ import com.google.common.collect.Lists;
*/
public class ScanUtil {
public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
+ public static final int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 0);
+
/*
* Max length that we fill our key when we turn an inclusive key
* into a exclusive key.
@@ -930,5 +933,17 @@ public class ScanUtil {
public static boolean isIndexRebuild(Scan scan) {
return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null;
}
+
+ public static int getClientVersion(Scan scan) {
+ int clientVersion = UNKNOWN_CLIENT_VERSION;
+ byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ if (clientVersionBytes != null) {
+ clientVersion = Bytes.toInt(clientVersionBytes);
+ }
+ return clientVersion;
+ }
+ public static void setClientVersion(Scan scan, int version) {
+ scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(version));
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index ab76ffe..7d6dfd4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -17,44 +17,65 @@
*/
package org.apache.phoenix.util;
+import java.io.IOException;
import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
-import org.apache.phoenix.transaction.TephraTransactionTable;
import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.tephra.util.TxUtils;
public class TransactionUtil {
+ // All transaction providers must use an empty byte array as the family delete marker
+ // (see TxConstants.FAMILY_DELETE_QUALIFIER)
+ public static final byte[] FAMILY_DELETE_MARKER = HConstants.EMPTY_BYTE_ARRAY;
+ // All transaction providers must multiply timestamps by this constant.
+ // (see TxConstants.MAX_TX_PER_MS)
+ public static final int MAX_TRANSACTIONS_PER_MILLISECOND = 1000000;
+ // Constant used to empirically determine if a timestamp is a transactional or
+ // non transactional timestamp (see TxUtils.MAX_NON_TX_TIMESTAMP)
+ private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1);
+
private TransactionUtil() {
+
}
public static boolean isTransactionalTimestamp(long ts) {
- return !TxUtils.isPreExistingVersion(ts);
+ return ts >= MAX_NON_TX_TIMESTAMP;
}
public static boolean isDelete(Cell cell) {
- return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY));
+ return CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
}
- public static long convertToNanoseconds(long serverTimeStamp) {
- return serverTimeStamp * TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
+ public static boolean isDeleteFamily(Cell cell) {
+ return CellUtil.matchingQualifier(cell, FAMILY_DELETE_MARKER) && CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY);
}
- public static long convertToMilliseconds(long serverTimeStamp) {
- return serverTimeStamp / TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
+ private static Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+ return CellUtil.createCell(row, family, FAMILY_DELETE_MARKER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ private static Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ public static long convertToNanoseconds(long serverTimeStamp) {
+ return serverTimeStamp * MAX_TRANSACTIONS_PER_MILLISECOND;
}
- public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) {
- return new TephraTransactionTable(phoenixTransactionContext, htable, pTable);
+ public static long convertToMilliseconds(long serverTimeStamp) {
+ return serverTimeStamp / MAX_TRANSACTIONS_PER_MILLISECOND;
}
// we resolve transactional tables at the txn read pointer
@@ -77,16 +98,58 @@ public class TransactionUtil {
return txInProgress ? convertToMilliseconds(mutationState.getInitialWritePointer()) : result.getMutationTime();
}
- public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional) throws SQLException {
+ public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional, TransactionFactory.Provider provider) throws SQLException {
Long timestamp = null;
if (!transactional) {
return timestamp;
}
MutationState mutationState = connection.getMutationState();
if (!mutationState.isTransactionStarted()) {
- mutationState.startTransaction();
+ mutationState.startTransaction(provider);
}
timestamp = convertToMilliseconds(mutationState.getInitialWritePointer());
return timestamp;
}
+
+ // Convert HBase Delete into Put so that it can be undone if transaction is rolled back
+ public static Mutation convertIfDelete(Mutation mutation) throws IOException {
+ if (mutation instanceof Delete) {
+ Put deleteMarker = null;
+ for (Map.Entry<byte[],List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
+ byte[] family = entry.getKey();
+ List<Cell> familyCells = entry.getValue();
+ if (familyCells.size() == 1) {
+ if (CellUtil.isDeleteFamily(familyCells.get(0))) {
+ if (deleteMarker == null) {
+ deleteMarker = new Put(mutation.getRow());
+ }
+ deleteMarker.add(newDeleteFamilyMarker(
+ deleteMarker.getRow(),
+ family,
+ familyCells.get(0).getTimestamp()));
+ }
+ } else {
+ for (Cell cell : familyCells) {
+ if (CellUtil.isDeleteColumns(cell)) {
+ if (deleteMarker == null) {
+ deleteMarker = new Put(mutation.getRow());
+ }
+ deleteMarker.add(newDeleteColumnMarker(
+ deleteMarker.getRow(),
+ family,
+ CellUtil.cloneQualifier(cell),
+ cell.getTimestamp()));
+ }
+ }
+ }
+ }
+ if (deleteMarker != null) {
+ for (Map.Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+ deleteMarker.setAttribute(entry.getKey(), entry.getValue());
+ }
+ mutation = deleteMarker;
+ }
+ }
+ return mutation;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 76757b0..d88a915 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -56,7 +56,6 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
@@ -64,8 +63,8 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.EncodedCQCounter;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
@@ -262,7 +261,7 @@ public class CorrelatePlanTest {
PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
null, null, columns, null, null, Collections.<PTable>emptyList(),
false, Collections.<PName>emptyList(), null, null, false, false, false, null,
- null, null, true, false, 0, 0L, Boolean.FALSE, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true);
+ null, null, true, null, 0, 0L, Boolean.FALSE, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true);
TableRef sourceTable = new TableRef(pTable);
List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
for (PColumn column : sourceTable.getTable().getColumns()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
index 1a7132c..017e6c8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java
@@ -50,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
@@ -58,11 +57,11 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.junit.Test;
@@ -183,7 +182,7 @@ public class LiteralResultIteratorPlanTest {
PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, PTableType.SUBQUERY, null,
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null,
Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false,
- false, null, null, null, true, false, 0, 0L, false, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true);
+ false, null, null, null, true, null, 0, 0L, false, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true);
TableRef sourceTable = new TableRef(pTable);
List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
for (PColumn column : sourceTable.getTable().getColumns()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 580becb..0ea63e7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -130,7 +130,6 @@ import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ConfigUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -168,7 +167,6 @@ public abstract class BaseTest {
private static final Map<String,String> tableDDLMap;
private static final Logger logger = LoggerFactory.getLogger(BaseTest.class);
- protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
private static final int dropTableTimeout = 300; // 5 mins should be long enough.
@@ -414,18 +412,6 @@ public abstract class BaseTest {
return url;
}
- private static void tearDownTxManager() throws SQLException {
- TransactionFactory.getTransactionProvider().getTransactionContext().tearDownTxManager();
- }
-
- protected static void setTxnConfigs() throws IOException {
- TransactionFactory.getTransactionProvider().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
- }
-
- protected static void setupTxManager() throws SQLException, IOException {
- TransactionFactory.getTransactionProvider().getTransactionContext().setupTxManager(config, getUrl());
- }
-
private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {
if (!clusterInitialized) {
url = setUpTestCluster(config, serverProps);
@@ -434,10 +420,6 @@ public abstract class BaseTest {
return url;
}
- private static void checkTxManagerInitialized(ReadOnlyProps clientProps) throws SQLException, IOException {
- setupTxManager();
- }
-
/**
* Set up the test hbase cluster.
* @return url to be used by clients to connect to the cluster.
@@ -476,11 +458,6 @@ public abstract class BaseTest {
final HBaseTestingUtility u = utility;
try {
destroyDriver();
- try {
- tearDownTxManager();
- } catch (Throwable t) {
- logger.error("Exception caught when shutting down tx manager", t);
- }
utility = null;
clusterInitialized = false;
} finally {
@@ -519,9 +496,7 @@ public abstract class BaseTest {
protected static void setUpTestDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception {
if (driver == null) {
- setTxnConfigs();
String url = checkClusterInitialized(serverProps);
- checkTxManagerInitialized(serverProps);
driver = initAndRegisterTestDriver(url, clientProps);
}
}
@@ -593,6 +568,7 @@ public abstract class BaseTest {
conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, DEFAULT_RPC_SCHEDULER_FACTORY);
conf.setLong(HConstants.ZK_SESSION_TIMEOUT, 10 * HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
conf.setLong(HConstants.ZOOKEEPER_TICK_TIME, 6 * 1000);
+
// override any defaults based on overrideProps
for (Entry<String,String> entry : overrideProps) {
conf.set(entry.getKey(), entry.getValue());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index c93e56e..a7569f7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -20,9 +20,12 @@ package org.apache.phoenix.query;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY;
import static org.apache.phoenix.query.QueryServicesOptions.withDefaults;
+import org.apache.curator.shaded.com.google.common.io.Files;
import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.tephra.TxConstants;
+import org.apache.twill.internal.utils.Networks;
/**
@@ -69,6 +72,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
* because we want to control it's execution ourselves
*/
public static final long DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY = Long.MAX_VALUE;
+ public static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
/**
@@ -117,7 +121,16 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
.setHConnectionPoolMaxSize(DEFAULT_HCONNECTION_POOL_MAX_SIZE)
.setMaxThreadsPerHTable(DEFAULT_HTABLE_MAX_THREADS)
.setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME)
- .setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY);
+ .setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY)
+ // setup default configs for Tephra
+ .set(TxConstants.Manager.CFG_DO_PERSIST, false)
+ .set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times")
+ .set(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1)
+ .set(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort())
+ .set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, Files.createTempDir().getAbsolutePath())
+ .set(TxConstants.Manager.CFG_TX_TIMEOUT, DEFAULT_TXN_TIMEOUT_SECONDS)
+ .set(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L)
+ ;
}
public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 1ec07b6..a06fd69 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -780,7 +780,7 @@ public class TestUtil {
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
MutationState mutationState = pconn.getMutationState();
if (table.isTransactional()) {
- mutationState.startTransaction();
+ mutationState.startTransaction(table.getTransactionProvider());
}
try (HTableInterface htable = mutationState.getHTable(table)) {
byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-protocol/src/main/PTable.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto
index ba9e0b4..16381dd 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -100,6 +100,7 @@ message PTable {
optional bytes encodingScheme = 35;
repeated EncodedCQCounter encodedCQCounters = 36;
optional bool useStatsForParallelization = 37;
+ optional int32 transactionProvider = 38;
}
message EncodedCQCounter {