You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/03/23 19:02:13 UTC
[4/5] phoenix git commit: PHOENIX-4660 Use TransactionProvider
interface
PHOENIX-4660 Use TransactionProvider interface
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ff8cd3d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ff8cd3d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ff8cd3d
Branch: refs/heads/master
Commit: 8ff8cd3daa28ccaf8edc7fc0bc4b563ee8aa6f06
Parents: d1f015d
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Mar 17 15:16:24 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Mar 23 11:47:54 2018 -0700
----------------------------------------------------------------------
.../phoenix/tx/FlappingTransactionIT.java | 8 +-
.../PhoenixTransactionalProcessor.java | 2 +-
.../apache/phoenix/execute/MutationState.java | 8 +-
.../PhoenixTxIndexMutationGenerator.java | 2 +-
.../apache/phoenix/index/IndexMaintainer.java | 2 +-
.../index/IndexMetaDataCacheFactory.java | 2 +-
.../index/PhoenixIndexMetaDataBuilder.java | 2 +-
.../query/ConnectionQueryServicesImpl.java | 2 +-
.../query/ConnectionlessQueryServicesImpl.java | 2 +-
.../transaction/OmidTransactionProvider.java | 78 +++++++++++
.../transaction/TephraTransactionProvider.java | 76 +++++++++++
.../phoenix/transaction/TransactionFactory.java | 129 +------------------
.../transaction/TransactionProvider.java | 36 ++++++
.../org/apache/phoenix/util/PhoenixRuntime.java | 2 +-
.../apache/phoenix/util/TransactionUtil.java | 8 +-
.../java/org/apache/phoenix/query/BaseTest.java | 6 +-
16 files changed, 220 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 301768b..200cf1c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -225,9 +225,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
}
PhoenixTransactionContext txContext =
- TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+ TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
PhoenixTransactionalTable txTable =
- TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable);
+ TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
txContext.begin();
@@ -277,9 +277,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
// Repeat the same as above, but this time abort the transaction
txContext =
- TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+ TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
txTable =
- TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable);
+ TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
txContext.begin();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
index ca0c997..0c26ecc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.transaction.TransactionFactory;
public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
public PhoenixTransactionalProcessor() {
- super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoprocessor());
+ super(TransactionFactory.getTransactionProvider().getTransactionContext().getCoprocessor());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 189bc5b..dfbb89b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -183,15 +183,15 @@ public class MutationState implements SQLCloseable {
: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
if (!subTask) {
if (txContext == null) {
- phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection);
+ phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(connection);
} else {
isExternalTxContext = true;
- phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
+ phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask);
}
} else {
// this code path is only used while running child scans, we can't pass the txContext to child scans
// as it is not thread safe, so we use the tx member variable
- phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
+ phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask);
}
}
@@ -1224,7 +1224,7 @@ public class MutationState implements SQLCloseable {
}
public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) throws IOException {
- return TransactionFactory.getTransactionFactory().getTransactionContext(txnBytes);
+ return TransactionFactory.getTransactionProvider().getTransactionContext(txnBytes);
}
private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index b5031af..7d6154e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -181,7 +181,7 @@ public class PhoenixTxIndexMutationGenerator {
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
scanRanges.initializeScan(scan);
- PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(indexMetaData.getTransactionContext(), htable);
+ PhoenixTransactionalTable txTable = TransactionFactory.getTransactionProvider().getTransactionalTable(indexMetaData.getTransactionContext(), htable);
// For rollback, we need to see all versions, including
// the last committed version as there may be multiple
// checkpointed versions.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 9042557..15d8ac3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1068,7 +1068,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
// Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor
- || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
+ || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionProvider().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
nDeleteCF++;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 03db767..94fbd0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -52,7 +52,7 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer);
final PhoenixTransactionContext txnContext;
try {
- txnContext = txState.length != 0 ? TransactionFactory.getTransactionFactory().getTransactionContext(txState) : null;
+ txnContext = txState.length != 0 ? TransactionFactory.getTransactionProvider().getTransactionContext(txState) : null;
} catch (IOException e) {
throw new SQLException(e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
index c954cf0..5e6f756 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
@@ -63,7 +63,7 @@ public class PhoenixIndexMetaDataBuilder {
boolean useProto = md != null;
byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto);
- final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState);
+ final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionProvider().getTransactionContext(txState);
byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION);
final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
return new IndexMetaDataCache() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1899e37..eff406d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -400,7 +400,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
private void initTxServiceClient() {
- txZKClientService = TransactionFactory.getTransactionFactory().getTransactionContext().setTransactionClient(config, props, connectionInfo);
+ txZKClientService = TransactionFactory.getTransactionProvider().getTransactionContext().setTransactionClient(config, props, connectionInfo);
}
private void openConnection() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index d25299a..c510b5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -136,7 +136,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
// Without making a copy of the configuration we cons up, we lose some of our properties
// on the server side during testing.
this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
- TransactionFactory.getTransactionFactory().getTransactionContext().setInMemoryTransactionClient(config);
+ TransactionFactory.getTransactionProvider().getTransactionContext().setInMemoryTransactionClient(config);
this.guidePostsCache = new GuidePostsCache(this, config);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
new file mode 100644
index 0000000..b0c1bfe
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+public class OmidTransactionProvider implements TransactionProvider {
+ private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
+
+ public static final OmidTransactionProvider getInstance() {
+ return INSTANCE;
+ }
+
+ private OmidTransactionProvider() {
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ return new OmidTransactionContext();
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
+ //return new OmidTransactionContext(txnBytes);
+ return null;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
+ //return new OmidTransactionContext(connection);
+ return null;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
+ //return new OmidTransactionContext(contex, connection, subTask);
+ return null;
+ }
+
+ @Override
+ public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
+ //return new OmidTransactionTable(ctx, htable);
+ return null;
+ }
+
+ @Override
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+ return CellUtil.createCell(row, family, HConstants.EMPTY_BYTE_ARRAY, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ @Override
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
new file mode 100644
index 0000000..795be9f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.tephra.TxConstants;
+
+public class TephraTransactionProvider implements TransactionProvider {
+ private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider();
+
+ public static final TephraTransactionProvider getInstance() {
+ return INSTANCE;
+ }
+
+ private TephraTransactionProvider() {
+ }
+
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ return new TephraTransactionContext();
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
+ return new TephraTransactionContext(txnBytes);
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
+ return new TephraTransactionContext(connection);
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
+ return new TephraTransactionContext(contex, connection, subTask);
+ }
+
+ @Override
+ public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
+ return new TephraTransactionTable(ctx, htable);
+ }
+
+ @Override
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+ return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ @Override
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index 37050fd..ea2822b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -17,140 +17,25 @@
*/
package org.apache.phoenix.transaction;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.tephra.TxConstants;
public class TransactionFactory {
-
- static private TransactionFactory transactionFactory = null;
-
- private TransactionProcessor tp = TransactionProcessor.Tephra;
-
enum TransactionProcessor {
Tephra,
Omid
}
- private TransactionFactory(TransactionProcessor tp) {
- this.tp = tp;
- }
-
- static public void createTransactionFactory(TransactionProcessor tp) {
- if (transactionFactory == null) {
- transactionFactory = new TransactionFactory(tp);
- }
- }
-
- static public TransactionFactory getTransactionFactory() {
- if (transactionFactory == null) {
- createTransactionFactory(TransactionProcessor.Tephra);
- }
-
- return transactionFactory;
- }
-
- public PhoenixTransactionContext getTransactionContext() {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
- case Tephra:
- ctx = new TephraTransactionContext();
- break;
- case Omid:
- ctx = new OmidTransactionContext();
- break;
- default:
- ctx = null;
- }
-
- return ctx;
- }
-
- public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
- case Tephra:
- ctx = new TephraTransactionContext(txnBytes);
- break;
- case Omid:
-// ctx = new OmidTransactionContext(txnBytes);
- break;
- default:
- ctx = null;
- }
-
- return ctx;
+ static public TransactionProvider getTransactionProvider() {
+ return TephraTransactionProvider.getInstance();
}
- public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
- case Tephra:
- ctx = new TephraTransactionContext(connection);
- break;
- case Omid:
-// ctx = new OmidTransactionContext(connection);
- break;
- default:
- ctx = null;
- }
-
- return ctx;
- }
-
- public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
+ static public TransactionProvider getTransactionProvider(TransactionProcessor processor) {
+ switch (processor) {
case Tephra:
- ctx = new TephraTransactionContext(contex, connection, subTask);
- break;
+ return TephraTransactionProvider.getInstance();
case Omid:
-// ctx = new OmidTransactionContext(contex, connection, subTask);
- break;
+ return OmidTransactionProvider.getInstance();
default:
- ctx = null;
+ throw new IllegalArgumentException("Unknown transaction processor: " + processor);
}
-
- return ctx;
- }
-
- public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
-
- PhoenixTransactionalTable table = null;
-
- switch(tp) {
- case Tephra:
- table = new TephraTransactionTable(ctx, htable);
- break;
- case Omid:
-// table = new OmidTransactionContext(contex, connection, subTask);
- break;
- default:
- table = null;
- }
-
- return table;
- }
-
- public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
- return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
- }
-
- public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
- return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
new file mode 100644
index 0000000..a5704f1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+public interface TransactionProvider {
+ public PhoenixTransactionContext getTransactionContext();
+ public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException;
+ public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection);
+ public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask);
+
+ public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable);
+
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp);
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index bc381f8..1c25c33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -1515,7 +1515,7 @@ public class PhoenixRuntime {
* @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp.
*/
public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) {
- return TransactionFactory.getTransactionFactory().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
+ return TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
}
public static long getCurrentScn(ReadOnlyProps props) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 9cd5829..8f02adc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -52,11 +52,11 @@ public class TransactionUtil {
}
public static long convertToNanoseconds(long serverTimeStamp) {
- return serverTimeStamp * TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
+ return serverTimeStamp * TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
}
public static long convertToMilliseconds(long serverTimeStamp) {
- return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
+ return serverTimeStamp / TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
}
public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) {
@@ -107,7 +107,7 @@ public class TransactionUtil {
if (deleteMarker == null) {
deleteMarker = new Put(mutation.getRow());
}
- deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteFamilyMarker(
+ deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteFamilyMarker(
deleteMarker.getRow(),
family,
familyCells.get(0).getTimestamp()));
@@ -118,7 +118,7 @@ public class TransactionUtil {
if (deleteMarker == null) {
deleteMarker = new Put(mutation.getRow());
}
- deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteColumnMarker(
+ deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteColumnMarker(
deleteMarker.getRow(),
family,
CellUtil.cloneQualifier(cell),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ff8cd3d/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 326efa3..580becb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -415,15 +415,15 @@ public abstract class BaseTest {
}
private static void tearDownTxManager() throws SQLException {
- TransactionFactory.getTransactionFactory().getTransactionContext().tearDownTxManager();
+ TransactionFactory.getTransactionProvider().getTransactionContext().tearDownTxManager();
}
protected static void setTxnConfigs() throws IOException {
- TransactionFactory.getTransactionFactory().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
+ TransactionFactory.getTransactionProvider().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
}
protected static void setupTxManager() throws SQLException, IOException {
- TransactionFactory.getTransactionFactory().getTransactionContext().setupTxManager(config, getUrl());
+ TransactionFactory.getTransactionProvider().getTransactionContext().setupTxManager(config, getUrl());
}
private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {