You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/03/22 16:57:19 UTC
[1/4] phoenix git commit: PHOENIX-4660 Use TransactionProvider
interface
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.3 d2d10ca2d -> 45e75de76
PHOENIX-4660 Use TransactionProvider interface
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/45e75de7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/45e75de7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/45e75de7
Branch: refs/heads/4.x-HBase-1.3
Commit: 45e75de7664e315219fe65c643f08c20b44a01aa
Parents: b61e72f
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Mar 17 15:16:24 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Mar 22 09:45:35 2018 -0700
----------------------------------------------------------------------
.../phoenix/tx/FlappingTransactionIT.java | 8 +-
.../PhoenixTransactionalProcessor.java | 2 +-
.../apache/phoenix/execute/MutationState.java | 8 +-
.../PhoenixTxIndexMutationGenerator.java | 2 +-
.../apache/phoenix/index/IndexMaintainer.java | 2 +-
.../index/IndexMetaDataCacheFactory.java | 2 +-
.../index/PhoenixIndexMetaDataBuilder.java | 2 +-
.../query/ConnectionQueryServicesImpl.java | 2 +-
.../query/ConnectionlessQueryServicesImpl.java | 2 +-
.../transaction/OmidTransactionProvider.java | 78 +++++++++++
.../transaction/TephraTransactionProvider.java | 76 +++++++++++
.../phoenix/transaction/TransactionFactory.java | 129 +------------------
.../transaction/TransactionProvider.java | 36 ++++++
.../org/apache/phoenix/util/PhoenixRuntime.java | 2 +-
.../apache/phoenix/util/TransactionUtil.java | 8 +-
.../java/org/apache/phoenix/query/BaseTest.java | 6 +-
16 files changed, 220 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 301768b..200cf1c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -225,9 +225,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
}
PhoenixTransactionContext txContext =
- TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+ TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
PhoenixTransactionalTable txTable =
- TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable);
+ TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
txContext.begin();
@@ -277,9 +277,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
// Repeat the same as above, but this time abort the transaction
txContext =
- TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+ TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
txTable =
- TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable);
+ TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable);
txContext.begin();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
index ca0c997..0c26ecc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.transaction.TransactionFactory;
public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
public PhoenixTransactionalProcessor() {
- super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoprocessor());
+ super(TransactionFactory.getTransactionProvider().getTransactionContext().getCoprocessor());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 189bc5b..dfbb89b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -183,15 +183,15 @@ public class MutationState implements SQLCloseable {
: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
if (!subTask) {
if (txContext == null) {
- phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection);
+ phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(connection);
} else {
isExternalTxContext = true;
- phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
+ phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask);
}
} else {
// this code path is only used while running child scans, we can't pass the txContext to child scans
// as it is not thread safe, so we use the tx member variable
- phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
+ phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask);
}
}
@@ -1224,7 +1224,7 @@ public class MutationState implements SQLCloseable {
}
public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) throws IOException {
- return TransactionFactory.getTransactionFactory().getTransactionContext(txnBytes);
+ return TransactionFactory.getTransactionProvider().getTransactionContext(txnBytes);
}
private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index b5031af..7d6154e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -181,7 +181,7 @@ public class PhoenixTxIndexMutationGenerator {
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
scanRanges.initializeScan(scan);
- PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(indexMetaData.getTransactionContext(), htable);
+ PhoenixTransactionalTable txTable = TransactionFactory.getTransactionProvider().getTransactionalTable(indexMetaData.getTransactionContext(), htable);
// For rollback, we need to see all versions, including
// the last committed version as there may be multiple
// checkpointed versions.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 9042557..15d8ac3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1068,7 +1068,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
// Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor
- || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
+ || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionProvider().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
nDeleteCF++;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 03db767..94fbd0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -52,7 +52,7 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer);
final PhoenixTransactionContext txnContext;
try {
- txnContext = txState.length != 0 ? TransactionFactory.getTransactionFactory().getTransactionContext(txState) : null;
+ txnContext = txState.length != 0 ? TransactionFactory.getTransactionProvider().getTransactionContext(txState) : null;
} catch (IOException e) {
throw new SQLException(e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
index c954cf0..5e6f756 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
@@ -63,7 +63,7 @@ public class PhoenixIndexMetaDataBuilder {
boolean useProto = md != null;
byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto);
- final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState);
+ final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionProvider().getTransactionContext(txState);
byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION);
final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
return new IndexMetaDataCache() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1899e37..eff406d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -400,7 +400,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
private void initTxServiceClient() {
- txZKClientService = TransactionFactory.getTransactionFactory().getTransactionContext().setTransactionClient(config, props, connectionInfo);
+ txZKClientService = TransactionFactory.getTransactionProvider().getTransactionContext().setTransactionClient(config, props, connectionInfo);
}
private void openConnection() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index d25299a..c510b5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -136,7 +136,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
// Without making a copy of the configuration we cons up, we lose some of our properties
// on the server side during testing.
this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
- TransactionFactory.getTransactionFactory().getTransactionContext().setInMemoryTransactionClient(config);
+ TransactionFactory.getTransactionProvider().getTransactionContext().setInMemoryTransactionClient(config);
this.guidePostsCache = new GuidePostsCache(this, config);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
new file mode 100644
index 0000000..b0c1bfe
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+public class OmidTransactionProvider implements TransactionProvider {
+ private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
+
+ public static final OmidTransactionProvider getInstance() {
+ return INSTANCE;
+ }
+
+ private OmidTransactionProvider() {
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ return new OmidTransactionContext();
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
+ //return new OmidTransactionContext(txnBytes);
+ return null;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
+ //return new OmidTransactionContext(connection);
+ return null;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
+ //return new OmidTransactionContext(contex, connection, subTask);
+ return null;
+ }
+
+ @Override
+ public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
+ //return new OmidTransactionTable(ctx, htable);
+ return null;
+ }
+
+ @Override
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+ return CellUtil.createCell(row, family, HConstants.EMPTY_BYTE_ARRAY, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ @Override
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
new file mode 100644
index 0000000..795be9f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.tephra.TxConstants;
+
+public class TephraTransactionProvider implements TransactionProvider {
+ private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider();
+
+ public static final TephraTransactionProvider getInstance() {
+ return INSTANCE;
+ }
+
+ private TephraTransactionProvider() {
+ }
+
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ return new TephraTransactionContext();
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
+ return new TephraTransactionContext(txnBytes);
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
+ return new TephraTransactionContext(connection);
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
+ return new TephraTransactionContext(contex, connection, subTask);
+ }
+
+ @Override
+ public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
+ return new TephraTransactionTable(ctx, htable);
+ }
+
+ @Override
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+ return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ @Override
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index 37050fd..ea2822b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -17,140 +17,25 @@
*/
package org.apache.phoenix.transaction;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.tephra.TxConstants;
public class TransactionFactory {
-
- static private TransactionFactory transactionFactory = null;
-
- private TransactionProcessor tp = TransactionProcessor.Tephra;
-
enum TransactionProcessor {
Tephra,
Omid
}
- private TransactionFactory(TransactionProcessor tp) {
- this.tp = tp;
- }
-
- static public void createTransactionFactory(TransactionProcessor tp) {
- if (transactionFactory == null) {
- transactionFactory = new TransactionFactory(tp);
- }
- }
-
- static public TransactionFactory getTransactionFactory() {
- if (transactionFactory == null) {
- createTransactionFactory(TransactionProcessor.Tephra);
- }
-
- return transactionFactory;
- }
-
- public PhoenixTransactionContext getTransactionContext() {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
- case Tephra:
- ctx = new TephraTransactionContext();
- break;
- case Omid:
- ctx = new OmidTransactionContext();
- break;
- default:
- ctx = null;
- }
-
- return ctx;
- }
-
- public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
- case Tephra:
- ctx = new TephraTransactionContext(txnBytes);
- break;
- case Omid:
-// ctx = new OmidTransactionContext(txnBytes);
- break;
- default:
- ctx = null;
- }
-
- return ctx;
+ static public TransactionProvider getTransactionProvider() {
+ return TephraTransactionProvider.getInstance();
}
- public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
- case Tephra:
- ctx = new TephraTransactionContext(connection);
- break;
- case Omid:
-// ctx = new OmidTransactionContext(connection);
- break;
- default:
- ctx = null;
- }
-
- return ctx;
- }
-
- public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
-
- PhoenixTransactionContext ctx = null;
-
- switch(tp) {
+ static public TransactionProvider getTransactionProvider(TransactionProcessor processor) {
+ switch (processor) {
case Tephra:
- ctx = new TephraTransactionContext(contex, connection, subTask);
- break;
+ return TephraTransactionProvider.getInstance();
case Omid:
-// ctx = new OmidTransactionContext(contex, connection, subTask);
- break;
+ return OmidTransactionProvider.getInstance();
default:
- ctx = null;
+ throw new IllegalArgumentException("Unknown transaction processor: " + processor);
}
-
- return ctx;
- }
-
- public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
-
- PhoenixTransactionalTable table = null;
-
- switch(tp) {
- case Tephra:
- table = new TephraTransactionTable(ctx, htable);
- break;
- case Omid:
-// table = new OmidTransactionContext(contex, connection, subTask);
- break;
- default:
- table = null;
- }
-
- return table;
- }
-
- public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
- return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
- }
-
- public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
- return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
new file mode 100644
index 0000000..a5704f1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+public interface TransactionProvider {
+ public PhoenixTransactionContext getTransactionContext();
+ public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException;
+ public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection);
+ public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask);
+
+ public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable);
+
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp);
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index bc381f8..1c25c33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -1515,7 +1515,7 @@ public class PhoenixRuntime {
* @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp.
*/
public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) {
- return TransactionFactory.getTransactionFactory().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
+ return TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
}
public static long getCurrentScn(ReadOnlyProps props) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 9cd5829..8f02adc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -52,11 +52,11 @@ public class TransactionUtil {
}
public static long convertToNanoseconds(long serverTimeStamp) {
- return serverTimeStamp * TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
+ return serverTimeStamp * TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
}
public static long convertToMilliseconds(long serverTimeStamp) {
- return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond();
+ return serverTimeStamp / TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond();
}
public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) {
@@ -107,7 +107,7 @@ public class TransactionUtil {
if (deleteMarker == null) {
deleteMarker = new Put(mutation.getRow());
}
- deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteFamilyMarker(
+ deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteFamilyMarker(
deleteMarker.getRow(),
family,
familyCells.get(0).getTimestamp()));
@@ -118,7 +118,7 @@ public class TransactionUtil {
if (deleteMarker == null) {
deleteMarker = new Put(mutation.getRow());
}
- deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteColumnMarker(
+ deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteColumnMarker(
deleteMarker.getRow(),
family,
CellUtil.cloneQualifier(cell),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/45e75de7/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 326efa3..580becb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -415,15 +415,15 @@ public abstract class BaseTest {
}
private static void tearDownTxManager() throws SQLException {
- TransactionFactory.getTransactionFactory().getTransactionContext().tearDownTxManager();
+ TransactionFactory.getTransactionProvider().getTransactionContext().tearDownTxManager();
}
protected static void setTxnConfigs() throws IOException {
- TransactionFactory.getTransactionFactory().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
+ TransactionFactory.getTransactionProvider().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS);
}
protected static void setupTxManager() throws SQLException, IOException {
- TransactionFactory.getTransactionFactory().getTransactionContext().setupTxManager(config, getUrl());
+ TransactionFactory.getTransactionProvider().getTransactionContext().setupTxManager(config, getUrl());
}
private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {
[4/4] phoenix git commit: PHOENIX-4619 Process transactional updates
to local index on server-side
Posted by ja...@apache.org.
PHOENIX-4619 Process transactional updates to local index on server-side
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5814fcbe
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5814fcbe
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5814fcbe
Branch: refs/heads/4.x-HBase-1.3
Commit: 5814fcbe0529efb9e1d308c09870cefbf1872a95
Parents: d2d10ca
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Mar 17 12:52:38 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Mar 22 09:45:35 2018 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/BaseIndexIT.java | 16 +-
.../phoenix/end2end/index/ImmutableIndexIT.java | 3 +-
.../end2end/index/MutableIndexFailureIT.java | 8 +-
.../apache/phoenix/compile/DeleteCompiler.java | 36 +-
.../PhoenixTransactionalProcessor.java | 2 +-
.../apache/phoenix/execute/MutationState.java | 103 +++-
.../PhoenixTxIndexMutationGenerator.java | 449 ++++++++++++++++
.../PhoenixTxnIndexMutationGenerator.java | 519 -------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 1 -
.../hbase/index/builder/BaseIndexBuilder.java | 4 +-
.../hbase/index/builder/BaseIndexCodec.java | 7 -
.../phoenix/hbase/index/covered/IndexCodec.java | 14 +-
.../hbase/index/covered/LocalTableState.java | 10 +-
.../hbase/index/covered/NonTxIndexBuilder.java | 2 +-
.../phoenix/hbase/index/covered/TableState.java | 8 -
.../apache/phoenix/index/IndexMaintainer.java | 23 +-
.../phoenix/index/PhoenixIndexBuilder.java | 21 +-
.../apache/phoenix/index/PhoenixIndexCodec.java | 34 +-
.../phoenix/index/PhoenixIndexMetaData.java | 78 +--
.../index/PhoenixIndexMetaDataBuilder.java | 106 ++++
.../index/PhoenixTransactionalIndexer.java | 442 +---------------
.../query/ConnectionQueryServicesImpl.java | 8 +
.../transaction/OmidTransactionContext.java | 2 +-
.../transaction/PhoenixTransactionContext.java | 2 +-
.../transaction/TephraTransactionContext.java | 2 +-
.../index/covered/CoveredColumnIndexCodec.java | 6 +-
.../covered/CoveredIndexCodecForTesting.java | 5 +-
.../index/covered/LocalTableStateTest.java | 10 +-
.../index/covered/NonTxIndexBuilderTest.java | 3 +
.../covered/TestCoveredColumnIndexCodec.java | 6 +-
30 files changed, 785 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
index 1483c58..f914256 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
@@ -239,15 +239,17 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT {
}
private void assertNoClientSideIndexMutations(Connection conn) throws SQLException {
- if (mutable) {
- Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
- if (iterator.hasNext()) {
- byte[] tableName = iterator.next().getFirst(); // skip data table mutations
- PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName));
+ Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
+ if (iterator.hasNext()) {
+ byte[] tableName = iterator.next().getFirst(); // skip data table mutations
+ PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName));
+ boolean clientSideUpdate = !localIndex && (!mutable || transactional);
+ if (!clientSideUpdate) {
assertTrue(table.getType() == PTableType.TABLE); // should be data table
- boolean hasIndexData = iterator.hasNext();
- assertFalse(hasIndexData && !transactional); // should have no index data
}
+ boolean hasIndexData = iterator.hasNext();
+ // global immutable and global transactional tables are processed client side
+ assertEquals(clientSideUpdate, hasIndexData);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index d520824..1db9787 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -252,8 +252,9 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
Iterator<Pair<byte[], List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
assertTrue(iterator.hasNext());
iterator.next();
- assertEquals((!localIndex || transactional), iterator.hasNext());
+ assertEquals(!localIndex, iterator.hasNext());
}
+
// This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back.
@Ignore
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 7e1367d..38bde7f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -112,7 +112,7 @@ public class MutableIndexFailureIT extends BaseTest {
public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) {
this.transactional = transactional;
this.localIndex = localIndex;
- this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "")
+ this.tableDDLOptions = " SALT_BUCKETS=2, COLUMN_ENCODED_BYTES=NONE" + (transactional ? ", TRANSACTIONAL=true " : "")
+ (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure))
+ (throwIndexWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.THROW_INDEX_WRITE_FAILURE + "=" + throwIndexWriteFailure));
this.tableName = FailingRegionObserver.FAIL_TABLE_NAME;
@@ -289,7 +289,6 @@ public class MutableIndexFailureIT extends BaseTest {
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
- FailingRegionObserver.FAIL_WRITE = true;
updateTable(conn, true);
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName),
@@ -466,9 +465,12 @@ public class MutableIndexFailureIT extends BaseTest {
stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
stmt.setString(1, "b");
stmt.execute();
+ // Set to fail after the DELETE, since transactional tables will write
+ // uncommitted data when the DELETE is executed.
+ FailingRegionObserver.FAIL_WRITE = true;
try {
conn.commit();
- if (commitShouldFail && !localIndex && this.throwIndexWriteFailure) {
+ if (commitShouldFail && (!localIndex || transactional) && this.throwIndexWriteFailure) {
fail();
}
} catch (CommitException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 70043bb..7985314 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -226,7 +226,7 @@ public class DeleteCompiler {
// When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
// row key will already have its value.
// Check for otherTableRefs being empty required when deleting directly from the index
- if (otherTableRefs.isEmpty() || (table.getIndexType() != IndexType.LOCAL && table.isImmutableRows())) {
+ if (otherTableRefs.isEmpty() || isMaintainedOnClient(table)) {
mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
}
for (int i = 0; i < otherTableRefs.size(); i++) {
@@ -311,12 +311,12 @@ public class DeleteCompiler {
}
}
- private List<PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) {
+ private List<PTable> getClientSideMaintainedIndexes(TableRef tableRef) {
PTable table = tableRef.getTable();
- if (table.isImmutableRows() && !table.getIndexes().isEmpty()) {
+ if (!table.getIndexes().isEmpty()) {
List<PTable> nonDisabledIndexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size());
for (PTable index : table.getIndexes()) {
- if (index.getIndexState() != PIndexState.DISABLE && index.getIndexType() == IndexType.GLOBAL) {
+ if (index.getIndexState() != PIndexState.DISABLE && isMaintainedOnClient(index)) {
nonDisabledIndexes.add(index);
}
}
@@ -459,8 +459,8 @@ public class DeleteCompiler {
.setTableName(tableName).build().buildException();
}
- List<PTable> immutableIndexes = getNonDisabledGlobalImmutableIndexes(targetTableRef);
- final boolean hasImmutableIndexes = !immutableIndexes.isEmpty();
+ List<PTable> clientSideIndexes = getClientSideMaintainedIndexes(targetTableRef);
+ final boolean hasClientSideIndexes = !clientSideIndexes.isEmpty();
boolean isSalted = table.getBucketNum() != null;
boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
@@ -468,7 +468,7 @@ public class DeleteCompiler {
int pkColumnOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
final int pkColumnCount = table.getPKColumns().size() - pkColumnOffset;
int selectColumnCount = pkColumnCount;
- for (PTable index : immutableIndexes) {
+ for (PTable index : clientSideIndexes) {
selectColumnCount += index.getPKColumns().size() - pkColumnCount;
}
Set<PColumn> projectedColumns = new LinkedHashSet<PColumn>(selectColumnCount + pkColumnOffset);
@@ -518,7 +518,7 @@ public class DeleteCompiler {
// that is being upserted for conflict detection purposes.
// If we have immutable indexes, we'd increase the number of bytes scanned by executing
// separate queries against each index, so better to drive from a single table in that case.
- boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasImmutableIndexes;
+ boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasClientSideIndexes;
HintNode hint = delete.getHint();
if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
select = SelectStatement.create(select, HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE));
@@ -529,7 +529,7 @@ public class DeleteCompiler {
QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe, new SequenceManager(statement));
final QueryPlan dataPlan = compiler.compile();
// TODO: the select clause should know that there's a sub query, but doesn't seem to currently
- queryPlans = Lists.newArrayList(!immutableIndexes.isEmpty()
+ queryPlans = Lists.newArrayList(!clientSideIndexes.isEmpty()
? optimizer.getApplicablePlans(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe)
: optimizer.getBestPlan(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe));
// Filter out any local indexes that don't contain all indexed columns.
@@ -559,7 +559,7 @@ public class DeleteCompiler {
// may have been optimized out. Instead, we check that there's a single SkipScanFilter
// If we can generate a plan for every index, that means all the required columns are available in every index,
// hence we can drive the delete from any of the plans.
- noQueryReqd &= queryPlans.size() == 1 + immutableIndexes.size();
+ noQueryReqd &= queryPlans.size() == 1 + clientSideIndexes.size();
int queryPlanIndex = 0;
while (noQueryReqd && queryPlanIndex < queryPlans.size()) {
QueryPlan plan = queryPlans.get(queryPlanIndex++);
@@ -578,7 +578,6 @@ public class DeleteCompiler {
// from the data table, while the others will be for deleting rows from immutable indexes.
List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size());
for (final QueryPlan plan : queryPlans) {
- final StatementContext context = plan.getContext();
mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes));
}
return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans);
@@ -628,8 +627,8 @@ public class DeleteCompiler {
}
}
final QueryPlan bestPlan = bestPlanToBe;
- final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(immutableIndexes.size());
- for (PTable index : immutableIndexes) {
+ final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(clientSideIndexes.size());
+ for (PTable index : clientSideIndexes) {
if (!bestPlan.getTableRef().getTable().equals(index)) {
otherTableRefs.add(new TableRef(index, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp()));
}
@@ -917,13 +916,13 @@ public class DeleteCompiler {
int totalTablesUpdateClientSide = 1; // data table is always updated
PTable bestTable = bestPlan.getTableRef().getTable();
// global immutable tables are also updated client side (but don't double count the data table)
- if (bestPlan != dataPlan && bestTable.getIndexType() == IndexType.GLOBAL && bestTable.isImmutableRows()) {
+ if (bestPlan != dataPlan && isMaintainedOnClient(bestTable)) {
totalTablesUpdateClientSide++;
}
for (TableRef otherTableRef : otherTableRefs) {
PTable otherTable = otherTableRef.getTable();
// Don't double count the data table here (which morphs when it becomes a projected table, hence this check)
- if (projectedTableRef != otherTableRef && otherTable.getIndexType() == IndexType.GLOBAL && otherTable.isImmutableRows()) {
+ if (projectedTableRef != otherTableRef && isMaintainedOnClient(otherTable)) {
totalTablesUpdateClientSide++;
}
}
@@ -972,4 +971,11 @@ public class DeleteCompiler {
return bestPlan;
}
}
+
+ private static boolean isMaintainedOnClient(PTable table) {
+ // Test for not being local (rather than being GLOBAL) so that this doesn't fail
+ // when tested with our projected table.
+ return table.getIndexType() != IndexType.LOCAL && (table.isImmutableRows() || table.isTransactional());
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
index 37fa2ab..ca0c997 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.transaction.TransactionFactory;
public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
public PhoenixTransactionalProcessor() {
- super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoProcessor());
+ super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoprocessor());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d3b3ca0..189bc5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.htrace.TraceScope;
+import org.apache.phoenix.cache.IndexMetaDataCache;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.MutationPlan;
@@ -62,6 +63,7 @@ import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -138,7 +140,6 @@ public class MutationState implements SQLCloseable {
private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
final PhoenixTransactionContext phoenixTransactionContext;
- final PhoenixTxnIndexMutationGenerator phoenixTxnIndexMutationGenerator;
private final MutationMetricQueue mutationMetricQueue;
private ReadMetricQueue readMetricQueue;
@@ -180,7 +181,7 @@ public class MutationState implements SQLCloseable {
boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
- if (subTask == false) {
+ if (!subTask) {
if (txContext == null) {
phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection);
} else {
@@ -192,8 +193,6 @@ public class MutationState implements SQLCloseable {
// as it is not thread safe, so we use the tx member variable
phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
}
-
- phoenixTxnIndexMutationGenerator = new PhoenixTxnIndexMutationGenerator(connection, phoenixTransactionContext);
}
public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
@@ -496,17 +495,20 @@ public class MutationState implements SQLCloseable {
private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values,
final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) {
final PTable table = tableRef.getTable();
- final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
- (includeAllIndexes || table.isTransactional()) ?
+ final Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism
+ includeAllIndexes ?
IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) :
- (table.isImmutableRows()) ?
+ (table.isImmutableRows() || table.isTransactional()) ?
IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) :
Collections.<PTable>emptyIterator();
+ final List<PTable> indexList = Lists.newArrayList(indexIterator);
+ final Iterator<PTable> indexes = indexList.iterator();
final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, mutationsPertainingToIndex);
return new Iterator<Pair<PName,List<Mutation>>>() {
boolean isFirst = true;
+ Map<byte[],List<Mutation>> indexMutationsMap = null;
@Override
public boolean hasNext() {
@@ -519,15 +521,34 @@ public class MutationState implements SQLCloseable {
isFirst = false;
return new Pair<PName,List<Mutation>>(table.getPhysicalName(), mutationList);
}
+
PTable index = indexes.next();
- List<Mutation> indexMutations;
+
+ List<Mutation> indexMutations = null;
try {
- if ((table.isImmutableRows() && (index.getIndexType() != IndexType.LOCAL)) || !table.isTransactional()) {
- indexMutations =
- IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex,
- connection.getKeyValueBuilder(), connection);
- } else {
- indexMutations = phoenixTxnIndexMutationGenerator.getIndexUpdates(table, index, mutationsPertainingToIndex);
+ if (!mutationsPertainingToIndex.isEmpty()) {
+ if (table.isTransactional()) {
+ if (indexMutationsMap == null) {
+ PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexList, mutationsPertainingToIndex.get(0).getAttributesMap());
+ try (HTableInterface htable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes())) {
+ Collection<Pair<Mutation, byte[]>> allMutations = generator.getIndexUpdates(htable, mutationsPertainingToIndex.iterator());
+ indexMutationsMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ for (Pair<Mutation, byte[]> mutation : allMutations) {
+ List<Mutation> mutations = indexMutationsMap.get(mutation.getSecond());
+ if (mutations == null) {
+ mutations = Lists.newArrayList();
+ indexMutationsMap.put(mutation.getSecond(), mutations);
+ }
+ mutations.add(mutation.getFirst());
+ }
+ }
+ }
+ indexMutations = indexMutationsMap.get(index.getPhysicalName().getBytes());
+ } else {
+ indexMutations =
+ IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex,
+ connection.getKeyValueBuilder(), connection);
+ }
}
// we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
@@ -537,13 +558,17 @@ public class MutationState implements SQLCloseable {
if (multiRowMutationState!=null) {
final List<Mutation> deleteMutations = Lists.newArrayList();
generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
- indexMutations.addAll(deleteMutations);
+ if (indexMutations == null) {
+ indexMutations = deleteMutations;
+ } else {
+ indexMutations.addAll(deleteMutations);
+ }
}
}
} catch (SQLException | IOException e) {
throw new IllegalDataException(e);
}
- return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations);
+ return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations == null ? Collections.<Mutation>emptyList() : indexMutations);
}
@Override
@@ -554,6 +579,42 @@ public class MutationState implements SQLCloseable {
};
}
+ private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable> indexes, Map<String,byte[]> attributes) {
+ final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
+ for (PTable index : indexes) {
+ IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+ indexMaintainers.add(maintainer);
+ }
+ IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ return phoenixTransactionContext;
+ }
+
+ @Override
+ public int getClientVersion() {
+ return MetaDataProtocol.PHOENIX_VERSION;
+ }
+
+ };
+ try {
+ PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes);
+ return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, table.getPhysicalName().getBytes());
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+
private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
final long serverTimestamp, final MultiRowMutationState values,
final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
@@ -585,17 +646,13 @@ public class MutationState implements SQLCloseable {
}
}
}
- PRow row =
- tableRef.getTable()
- .newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);
+ PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);
List<Mutation> rowMutations, rowMutationsPertainingToIndex;
if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
row.delete();
rowMutations = row.toRowMutations();
- // Row deletes for index tables are processed by running a re-written query
- // against the index table (as this allows for flexibility in being able to
- // delete rows).
- rowMutationsPertainingToIndex = rowMutations;
+ // The DeleteCompiler already generates the deletes for indexes, so no need to do it again
+ rowMutationsPertainingToIndex = Collections.emptyList();
} else {
for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues()
.entrySet()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
new file mode 100644
index 0000000..b5031af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.MultiMutation;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
+
+
+public class PhoenixTxIndexMutationGenerator {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixTxIndexMutationGenerator.class);
+
+ private final PhoenixIndexCodec codec;
+ private final PhoenixIndexMetaData indexMetaData;
+
+ public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) {
+ this.indexMetaData = indexMetaData;
+ this.codec = new PhoenixIndexCodec(conf, regionStartKey, regionEndKey, tableName);
+ }
+
+ public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName) {
+ this(conf, indexMetaData, tableName, null, null);
+ }
+
+ private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
+ MultiMutation stored = mutations.get(row);
+ // we haven't seen this row before, so add it
+ if (stored == null) {
+ stored = new MultiMutation(row);
+ mutations.put(row, stored);
+ }
+ stored.addAll(m);
+ }
+
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdates(HTableInterface htable, Iterator<Mutation> mutationIterator) throws IOException, SQLException {
+
+ if (!mutationIterator.hasNext()) {
+ return Collections.emptyList();
+ }
+
+ List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
+ ResultScanner currentScanner = null;
+ // Collect up all mutations in batch
+ Map<ImmutableBytesPtr, MultiMutation> mutations =
+ new HashMap<ImmutableBytesPtr, MultiMutation>();
+ // Collect the set of mutable ColumnReferences so that we can first
+ // run a scan to get the current state. We'll need this to delete
+ // the existing index rows.
+ int estimatedSize = indexMaintainers.size() * 10;
+ Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
+ for (IndexMaintainer indexMaintainer : indexMaintainers) {
+ // For transactional tables, we use an index maintainer
+ // to aid in rollback if there's a KeyValue column in the index. The alternative would be
+ // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
+ // client side.
+ Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+ mutableColumns.addAll(allColumns);
+ }
+
+ Mutation m = mutationIterator.next();
+ Map<String,byte[]> updateAttributes = m.getAttributesMap();
+ byte[] txRollbackAttribute = updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
+ boolean isRollback = txRollbackAttribute!=null;
+
+ boolean isImmutable = indexMetaData.isImmutableRows();
+ Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
+ if (isImmutable && !isRollback) {
+ findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
+ } else {
+ findPriorValueMutations = mutations;
+ }
+
+ while (true) {
+ // add the mutation to the batch set
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ // if we have no non PK columns, no need to find the prior values
+ if ( mutations != findPriorValueMutations && indexMetaData.requiresPriorRowState(m) ) {
+ addMutation(findPriorValueMutations, row, m);
+ }
+ addMutation(mutations, row, m);
+
+ if (!mutationIterator.hasNext()) {
+ break;
+ }
+ m = mutationIterator.next();
+ }
+
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
+ // Track if we have row keys with Delete mutations (or Puts that are
+ // Tephra's Delete marker). If there are none, we don't need to do the scan for
+ // prior versions, if there are, we do. Since rollbacks always have delete mutations,
+ // this logic will work there too.
+ if (!findPriorValueMutations.isEmpty()) {
+ List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
+ for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
+ }
+ Scan scan = new Scan();
+ // Project all mutable columns
+ for (ColumnReference ref : mutableColumns) {
+ scan.addColumn(ref.getFamily(), ref.getQualifier());
+ }
+ /*
+ * Indexes inherit the storage scheme of the data table which means all the indexes have the same
+ * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
+ * supporting new indexes over existing data tables to have a different storage scheme than the data
+ * table.
+ */
+ byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+
+ // Project empty key value column
+ scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
+ ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
+ scanRanges.initializeScan(scan);
+ PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(indexMetaData.getTransactionContext(), htable);
+ // For rollback, we need to see all versions, including
+ // the last committed version as there may be multiple
+ // checkpointed versions.
+ SkipScanFilter filter = scanRanges.getSkipScanFilter();
+ if (isRollback) {
+ filter = new SkipScanFilter(filter,true);
+ indexMetaData.getTransactionContext().setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
+ }
+ scan.setFilter(filter);
+ currentScanner = txTable.getScanner(scan);
+ }
+ if (isRollback) {
+ processRollback(indexMetaData, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations);
+ } else {
+ processMutation(indexMetaData, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
+ }
+
+ return indexUpdates;
+ }
+
+ private void processMutation(PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
+ ResultScanner scanner,
+ Set<ColumnReference> upsertColumns,
+ Collection<Pair<Mutation, byte[]>> indexUpdates,
+ Map<ImmutableBytesPtr, MultiMutation> mutations,
+ Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
+ List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
+ if (scanner != null) {
+ Result result;
+ ColumnReference emptyColRef = new ColumnReference(indexMaintainers.get(0)
+ .getDataEmptyKeyValueCF(), indexMaintainers.get(0).getEmptyKeyValueQualifier());
+ // Process existing data table rows by removing the old index row and adding the new index row
+ while ((result = scanner.next()) != null) {
+ Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
+ TxTableState state = new TxTableState(upsertColumns, indexMetaData.getTransactionContext().getWritePointer(), m, emptyColRef, result);
+ generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+ generatePuts(indexMetaData, indexUpdates, state);
+ }
+ }
+ // Process new data table by adding new index rows
+ for (Mutation m : mutations.values()) {
+ TxTableState state = new TxTableState(upsertColumns, indexMetaData.getTransactionContext().getWritePointer(), m);
+ generatePuts(indexMetaData, indexUpdates, state);
+ generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+ }
+ }
+
+ private void processRollback(PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
+ ResultScanner scanner,
+ Set<ColumnReference> mutableColumns,
+ Collection<Pair<Mutation, byte[]>> indexUpdates,
+ Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException {
+ if (scanner != null) {
+ long readPtr = indexMetaData.getTransactionContext().getReadPointer();
+ Result result;
+ // Loop through last committed row state plus all new rows associated with current transaction
+ // to generate point delete markers for all index rows that were added. We don't have Tephra
+ // manage index rows in change sets because we don't want to be hit with the additional
+ // memory hit and do not need to do conflict detection on index rows.
+ ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
+ while ((result = scanner.next()) != null) {
+ Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
+ // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
+ // (as if we're "replaying" them in time order).
+ List<Cell> cells = result.listCells();
+ Collections.sort(cells, new Comparator<Cell>() {
+
+ @Override
+ public int compare(Cell o1, Cell o2) {
+ int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+ if (c != 0) return c;
+ c = o1.getTypeByte() - o2.getTypeByte();
+ if (c != 0) return c;
+ c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength());
+ if (c != 0) return c;
+ return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength());
+ }
+
+ });
+ int i = 0;
+ int nCells = cells.size();
+ Result oldResult = null, newResult;
+ do {
+ boolean hasPuts = false;
+ LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
+ long writePtr;
+ Cell cell = cells.get(i);
+ do {
+ hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode();
+ writePtr = cell.getTimestamp();
+ ListIterator<Cell> it = singleTimeCells.listIterator();
+ do {
+ // Add at the beginning of the list to match the expected HBase
+ // newest to oldest sort order (which TxTableState relies on
+ // with the Result.getLatestColumnValue() calls). However, we
+ // still want to add Cells in the expected order for each time
+ // bound as otherwise we won't find it in our old state.
+ it.add(cell);
+ } while (++i < nCells && (cell=cells.get(i)).getTimestamp() == writePtr);
+ } while (i < nCells && cell.getTimestamp() <= readPtr);
+
+ // Generate point delete markers for the prior row deletion of the old index value.
+ // The write timestamp is the next timestamp, not the current timestamp,
+ // as the earliest cells are the current values for the row (and we don't
+ // want to delete the current row).
+ if (oldResult != null) {
+ TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, oldResult);
+ generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+ }
+ // Generate point delete markers for the new index value.
+ // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not
+ // generate deletes. We would have generated the delete above based on the state
+ // of the previous row. The delete markers do not give us the state we need to
+ // delete.
+ if (hasPuts) {
+ newResult = Result.create(singleTimeCells);
+ // First row may represent the current state which we don't want to delete
+ if (writePtr > readPtr) {
+ TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult);
+ generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+ }
+ oldResult = newResult;
+ } else {
+ oldResult = null;
+ }
+ } while (i < nCells);
+ }
+ }
+ }
+
+ private void generateDeletes(PhoenixIndexMetaData indexMetaData,
+ Collection<Pair<Mutation, byte[]>> indexUpdates,
+ byte[] attribValue, TxTableState state) throws IOException {
+ Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
+ for (IndexUpdate delete : deletes) {
+ if (delete.isValid()) {
+ delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
+ indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+ }
+ }
+ }
+
+ private boolean generatePuts(
+ PhoenixIndexMetaData indexMetaData,
+ Collection<Pair<Mutation, byte[]>> indexUpdates,
+ TxTableState state)
+ throws IOException {
+ state.applyMutation();
+ Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
+ boolean validPut = false;
+ for (IndexUpdate put : puts) {
+ if (put.isValid()) {
+ indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
+ validPut = true;
+ }
+ }
+ return validPut;
+ }
+
+
+ private static class TxTableState implements TableState {
+ private final Mutation mutation;
+ private final long currentTimestamp;
+ private final List<KeyValue> pendingUpdates;
+ private final Set<ColumnReference> indexedColumns;
+ private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
+
+ private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) {
+ this.currentTimestamp = currentTimestamp;
+ this.indexedColumns = indexedColumns;
+ this.mutation = mutation;
+ int estimatedSize = indexedColumns.size();
+ this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
+ this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
+ try {
+ CellScanner scanner = mutation.cellScanner();
+ while (scanner.advance()) {
+ Cell cell = scanner.current();
+ pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+
+ public TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
+ this(indexedColumns, currentTimestamp, m);
+
+ for (ColumnReference ref : indexedColumns) {
+ Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+ if (cell != null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ valueMap.put(ref, ptr);
+ }
+ }
+ }
+
+ @Override
+ public long getCurrentTimestamp() {
+ return currentTimestamp;
+ }
+
+ @Override
+ public byte[] getCurrentRowKey() {
+ return mutation.getRow();
+ }
+
+ @Override
+ public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+ return Collections.emptyList();
+ }
+
+ private void applyMutation() {
+ for (Cell cell : pendingUpdates) {
+ if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+ ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ valueMap.remove(ref);
+ } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
+ for (ColumnReference ref : indexedColumns) {
+ if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
+ valueMap.remove(ref);
+ }
+ }
+ } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
+ ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ if (indexedColumns.contains(ref)) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ valueMap.put(ref, ptr);
+ }
+ } else {
+ throw new IllegalStateException("Unexpected mutation type for " + cell);
+ }
+ }
+ }
+
+ @Override
+ public Collection<KeyValue> getPendingUpdate() {
+ return pendingUpdates;
+ }
+
+ @Override
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
+ throws IOException {
+ // TODO: creating these objects over and over again is wasteful
+ ColumnTracker tracker = new ColumnTracker(indexedColumns);
+ ValueGetter getter = new ValueGetter() {
+
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+ return valueMap.get(ref);
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return mutation.getRow();
+ }
+
+ };
+ Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
+ return pair;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
deleted file mode 100644
index b596b75..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.filter.SkipScanFilter;
-import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
-
-
-public class PhoenixTxnIndexMutationGenerator {
-
- private static final Log LOG = LogFactory.getLog(PhoenixTxnIndexMutationGenerator.class);
-
- private final PhoenixConnection connection;
- private final PhoenixTransactionContext phoenixTransactionContext;
-
- PhoenixTxnIndexMutationGenerator(PhoenixConnection connection, PhoenixTransactionContext phoenixTransactionContext) {
- this.phoenixTransactionContext = phoenixTransactionContext;
- this.connection = connection;
- }
-
- private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
- MultiMutation stored = mutations.get(row);
- // we haven't seen this row before, so add it
- if (stored == null) {
- stored = new MultiMutation(row);
- mutations.put(row, stored);
- }
- stored.addAll(m);
- }
-
- public List<Mutation> getIndexUpdates(final PTable table, PTable index, List<Mutation> dataMutations) throws IOException, SQLException {
-
- if (dataMutations.isEmpty()) {
- return new ArrayList<Mutation>();
- }
-
- Map<String,byte[]> updateAttributes = dataMutations.get(0).getAttributesMap();
- boolean replyWrite = (BaseScannerRegionObserver.ReplayWrite.fromBytes(updateAttributes.get(BaseScannerRegionObserver.REPLAY_WRITES)) != null);
- byte[] txRollbackAttribute = updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
-
- IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
-
- boolean isRollback = txRollbackAttribute!=null;
- boolean isImmutable = index.isImmutableRows();
- ResultScanner currentScanner = null;
- HTableInterface txTable = null;
- // Collect up all mutations in batch
- Map<ImmutableBytesPtr, MultiMutation> mutations =
- new HashMap<ImmutableBytesPtr, MultiMutation>();
- Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
- if (isImmutable && !isRollback) {
- findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
- } else {
- findPriorValueMutations = mutations;
- }
- // Collect the set of mutable ColumnReferences so that we can first
- // run a scan to get the current state. We'll need this to delete
- // the existing index rows.
- int estimatedSize = 10;
- Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
- // For transactional tables, we use an index maintainer
- // to aid in rollback if there's a KeyValue column in the index. The alternative would be
- // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
- // client side.
- Set<ColumnReference> allColumns = maintainer.getAllColumns();
- mutableColumns.addAll(allColumns);
-
- for(final Mutation m : dataMutations) {
- // add the mutation to the batch set
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- // if we have no non PK columns, no need to find the prior values
-
- boolean requiresPriorRowState = !isImmutable || (maintainer.isRowDeleted(m) && !maintainer.getIndexedColumns().isEmpty());
- if (mutations != findPriorValueMutations && requiresPriorRowState) {
- addMutation(findPriorValueMutations, row, m);
- }
- addMutation(mutations, row, m);
- }
-
- List<Mutation> indexUpdates = new ArrayList<Mutation>(mutations.size() * 2);
- try {
- // Track if we have row keys with Delete mutations (or Puts that are
- // Tephra's Delete marker). If there are none, we don't need to do the scan for
- // prior versions, if there are, we do. Since rollbacks always have delete mutations,
- // this logic will work there too.
- if (!findPriorValueMutations.isEmpty()) {
- List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
- for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
- keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
- }
- Scan scan = new Scan();
- // Project all mutable columns
- for (ColumnReference ref : mutableColumns) {
- scan.addColumn(ref.getFamily(), ref.getQualifier());
- }
- /*
- * Indexes inherit the storage scheme of the data table which means all the indexes have the same
- * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
- * supporting new indexes over existing data tables to have a different storage scheme than the data
- * table.
- */
- byte[] emptyKeyValueQualifier = maintainer.getEmptyKeyValueQualifier();
-
- // Project empty key value column
- scan.addColumn(maintainer.getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
- ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
- scanRanges.initializeScan(scan);
- txTable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes());
- // For rollback, we need to see all versions, including
- // the last committed version as there may be multiple
- // checkpointed versions.
- SkipScanFilter filter = scanRanges.getSkipScanFilter();
- if (isRollback) {
- filter = new SkipScanFilter(filter,true);
- phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
- }
- scan.setFilter(filter);
- currentScanner = txTable.getScanner(scan);
- }
- if (isRollback) {
- processRollback(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, replyWrite, table);
- } else {
- processMutation(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, findPriorValueMutations, replyWrite, table);
- }
- } finally {
- if (txTable != null) txTable.close();
- }
-
- return indexUpdates;
- }
-
- private void processMutation(IndexMaintainer maintainer,
- byte[] txRollbackAttribute,
- ResultScanner scanner,
- Set<ColumnReference> upsertColumns,
- Collection<Mutation> indexUpdates,
- Map<ImmutableBytesPtr, MultiMutation> mutations,
- Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue,
- boolean replyWrite,
- final PTable table) throws IOException, SQLException {
- if (scanner != null) {
- Result result;
- ColumnReference emptyColRef = new ColumnReference(maintainer
- .getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier());
- // Process existing data table rows by removing the old index row and adding the new index row
- while ((result = scanner.next()) != null) {
- Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
- TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m, emptyColRef, result);
- generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
- generatePuts(indexUpdates, state, maintainer, replyWrite, table);
- }
- }
- // Process new data table by adding new index rows
- for (Mutation m : mutations.values()) {
- TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m);
- generatePuts(indexUpdates, state, maintainer, replyWrite, table);
- generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
- }
- }
-
- private void processRollback(IndexMaintainer maintainer,
- byte[] txRollbackAttribute,
- ResultScanner scanner,
- Set<ColumnReference> mutableColumns,
- Collection<Mutation> indexUpdates,
- Map<ImmutableBytesPtr, MultiMutation> mutations,
- boolean replyWrite,
- final PTable table) throws IOException, SQLException {
- if (scanner != null) {
- Result result;
- // Loop through last committed row state plus all new rows associated with current transaction
- // to generate point delete markers for all index rows that were added. We don't have Tephra
- // manage index rows in change sets because we don't want to be hit with the additional
- // memory hit and do not need to do conflict detection on index rows.
- ColumnReference emptyColRef = new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier());
- while ((result = scanner.next()) != null) {
- Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
- // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
- // (as if we're "replaying" them in time order).
- List<Cell> cells = result.listCells();
- Collections.sort(cells, new Comparator<Cell>() {
-
- @Override
- public int compare(Cell o1, Cell o2) {
- int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp());
- if (c != 0) return c;
- c = o1.getTypeByte() - o2.getTypeByte();
- if (c != 0) return c;
- c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength());
- if (c != 0) return c;
- return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength());
- }
-
- });
- int i = 0;
- int nCells = cells.size();
- Result oldResult = null, newResult;
- long readPtr = phoenixTransactionContext.getReadPointer();
- do {
- boolean hasPuts = false;
- LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
- long writePtr;
- Cell cell = cells.get(i);
- do {
- hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode();
- writePtr = cell.getTimestamp();
- ListIterator<Cell> it = singleTimeCells.listIterator();
- do {
- // Add at the beginning of the list to match the expected HBase
- // newest to oldest sort order (which TxTableState relies on
- // with the Result.getLatestColumnValue() calls). However, we
- // still want to add Cells in the expected order for each time
- // bound as otherwise we won't find it in our old state.
- it.add(cell);
- } while (++i < nCells && (cell = cells.get(i)).getTimestamp() == writePtr);
- } while (i < nCells && cell.getTimestamp() <= readPtr);
-
- // Generate point delete markers for the prior row deletion of the old index value.
- // The write timestamp is the next timestamp, not the current timestamp,
- // as the earliest cells are the current values for the row (and we don't
- // want to delete the current row).
- if (oldResult != null) {
- TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, oldResult);
- generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
- }
- // Generate point delete markers for the new index value.
- // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not
- // generate deletes. We would have generated the delete above based on the state
- // of the previous row. The delete markers do not give us the state we need to
- // delete.
- if (hasPuts) {
- newResult = Result.create(singleTimeCells);
- // First row may represent the current state which we don't want to delete
- if (writePtr > readPtr) {
- TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult);
- generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
- }
- oldResult = newResult;
- } else {
- oldResult = null;
- }
- } while (i < nCells);
- }
- }
- }
-
- private Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException {
- if (maintainer.isRowDeleted(state.getPendingUpdate())) {
- return Collections.emptyList();
- }
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(state.getCurrentRowKey());
- List<IndexUpdate> indexUpdates = Lists.newArrayList();
- Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), replyWrite, false, null);
- ValueGetter valueGetter = statePair.getFirst();
- IndexUpdate indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName());
-
- byte[] regionStartKey = null;
- byte[] regionEndkey = null;
- if(maintainer.isLocalIndex()) {
- HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey());
- regionStartKey = tableRegionLocation.getRegionInfo().getStartKey();
- regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
- }
-
- Put put = maintainer.buildUpdateMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), regionStartKey, regionEndkey);
- indexUpdate.setUpdate(put);
- indexUpdates.add(indexUpdate);
-
- return indexUpdates;
- }
-
- private Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(state.getCurrentRowKey());
- List<IndexUpdate> indexUpdates = Lists.newArrayList();
- // For transactional tables, we use an index maintainer
- // to aid in rollback if there's a KeyValue column in the index. The alternative would be
- // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
- // client side.
- Set<ColumnReference> cols = Sets.newHashSet(maintainer.getAllColumns());
- cols.add(new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier()));
- Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(cols, replyWrite, true, null);
- ValueGetter valueGetter = statePair.getFirst();
- if (valueGetter!=null) {
- IndexUpdate indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName());
-
- byte[] regionStartKey = null;
- byte[] regionEndkey = null;
- if(maintainer.isLocalIndex()) {
- HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey());
- regionStartKey = tableRegionLocation.getRegionInfo().getStartKey();
- regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
- }
-
- Delete delete = maintainer.buildDeleteMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
- state.getCurrentTimestamp(), regionStartKey, regionEndkey);
- indexUpdate.setUpdate(delete);
- indexUpdates.add(indexUpdate);
- }
- return indexUpdates;
- }
-
- private void generateDeletes(Collection<Mutation> indexUpdates,
- byte[] attribValue,
- TxTableState state,
- IndexMaintainer maintainer,
- boolean replyWrite,
- final PTable table) throws IOException, SQLException {
- Iterable<IndexUpdate> deletes = getIndexDeletes(state, maintainer, replyWrite, table);
- for (IndexUpdate delete : deletes) {
- if (delete.isValid()) {
- delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
- indexUpdates.add(delete.getUpdate());
- }
- }
- }
-
- private boolean generatePuts(Collection<Mutation> indexUpdates,
- TxTableState state,
- IndexMaintainer maintainer,
- boolean replyWrite,
- final PTable table) throws IOException, SQLException {
- state.applyMutation();
- Iterable<IndexUpdate> puts = getIndexUpserts(state, maintainer, replyWrite, table);
- boolean validPut = false;
- for (IndexUpdate put : puts) {
- if (put.isValid()) {
- indexUpdates.add(put.getUpdate());
- validPut = true;
- }
- }
- return validPut;
- }
-
-
- private static class TxTableState implements TableState {
- private final Mutation mutation;
- private final long currentTimestamp;
- private final List<KeyValue> pendingUpdates;
- private final Set<ColumnReference> indexedColumns;
- private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
-
- private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) {
- this.currentTimestamp = currentTimestamp;
- this.indexedColumns = indexedColumns;
- this.mutation = mutation;
- int estimatedSize = indexedColumns.size();
- this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
- this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
- try {
- CellScanner scanner = mutation.cellScanner();
- while (scanner.advance()) {
- Cell cell = scanner.current();
- pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
- }
- } catch (IOException e) {
- throw new RuntimeException(e); // Impossible
- }
- }
-
- public TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
- this(indexedColumns, currentTimestamp, m);
-
- for (ColumnReference ref : indexedColumns) {
- Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
- if (cell != null) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- valueMap.put(ref, ptr);
- }
- }
- }
-
- @Override
- public RegionCoprocessorEnvironment getEnvironment() {
- return null;
- }
-
- @Override
- public long getCurrentTimestamp() {
- return currentTimestamp;
- }
-
- @Override
- public byte[] getCurrentRowKey() {
- return mutation.getRow();
- }
-
- @Override
- public List<? extends IndexedColumnGroup> getIndexColumnHints() {
- return Collections.emptyList();
- }
-
- private void applyMutation() {
- for (Cell cell : pendingUpdates) {
- if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
- ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- valueMap.remove(ref);
- } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
- for (ColumnReference ref : indexedColumns) {
- if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
- valueMap.remove(ref);
- }
- }
- } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
- ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- if (indexedColumns.contains(ref)) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- valueMap.put(ref, ptr);
- }
- } else {
- throw new IllegalStateException("Unexpected mutation type for " + cell);
- }
- }
- }
-
- @Override
- public Collection<KeyValue> getPendingUpdate() {
- return pendingUpdates;
- }
-
- @Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
- throws IOException {
- // TODO: creating these objects over and over again is wasteful
- ColumnTracker tracker = new ColumnTracker(indexedColumns);
- ValueGetter getter = new ValueGetter() {
-
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
- return valueMap.get(ref);
- }
-
- @Override
- public byte[] getRowKey() {
- return mutation.getRow();
- }
-
- };
- Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
- return pair;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index f77c162..7b9bc1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -87,7 +87,6 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
-import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index a2edd45..f13e97a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -58,9 +58,7 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
meth.setAccessible(true);
this.codec = meth.newInstance();
- this.codec.initialize(env);
- } catch (IOException e) {
- throw e;
+ this.codec.initialize(conf, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey(), env.getRegion().getRegionInfo().getTable().getName());
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
index cf6e95e..7489a8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -20,16 +20,9 @@ package org.apache.phoenix.hbase.index.builder;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
public abstract class BaseIndexCodec implements IndexCodec {
-
- @Override
- public void initialize(RegionCoprocessorEnvironment env) throws IOException {
- // noop
- }
-
/**
* {@inheritDoc}
* <p>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index e6d683e..7dde941 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -11,9 +11,9 @@ package org.apache.phoenix.hbase.index.covered;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
/**
@@ -24,16 +24,6 @@ import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
*/
public interface IndexCodec {
/**
- * Do any code initialization necessary
- *
- * @param env
- * environment in which the codec is operating
- * @throws IOException
- * if the codec cannot be initalized correctly
- */
- public void initialize(RegionCoprocessorEnvironment env) throws IOException;
-
- /**
* Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is
* specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex'
* with row :
@@ -89,4 +79,6 @@ public interface IndexCodec {
* @throws IOException
*/
public boolean isEnabled(Mutation m) throws IOException;
+
+ public void initialize(Configuration conf, byte[] startKey, byte[] endKey, byte[] tableName);
}
\ No newline at end of file
[3/4] phoenix git commit: PHOENIX-4619 Process transactional updates
to local index on server-side
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 9bd4db8..4f65416 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
@@ -44,7 +43,6 @@ import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
public class LocalTableState implements TableState {
private long ts;
- private RegionCoprocessorEnvironment env;
private KeyValueStore memstore;
private LocalHBaseState table;
private Mutation update;
@@ -54,8 +52,7 @@ public class LocalTableState implements TableState {
private List<? extends IndexedColumnGroup> hints;
private CoveredColumns columnSet;
- public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
- this.env = environment;
+ public LocalTableState(LocalHBaseState table, Mutation update) {
this.table = table;
this.update = update;
this.memstore = new IndexMemStore();
@@ -104,11 +101,6 @@ public class LocalTableState implements TableState {
}
@Override
- public RegionCoprocessorEnvironment getEnvironment() {
- return this.env;
- }
-
- @Override
public long getCurrentTimestamp() {
return this.ts;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 8dd57c0..97ac30d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -51,7 +51,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
// create a state manager, so we can manage each batch
- LocalTableState state = new LocalTableState(env, localTable, mutation);
+ LocalTableState state = new LocalTableState(localTable, mutation);
// build the index updates for each group
IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 605cbe3..cb2b41f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -36,13 +35,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
*/
public interface TableState {
- // use this to get batch ids/ptable stuff
- /**
- * WARNING: messing with this can affect the indexing plumbing. Use with caution :)
- * @return get the current environment in which this table lives.
- */
- public RegionCoprocessorEnvironment getEnvironment();
-
/**
* @return the current timestamp up-to-which we are releasing table state.
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index fa60679..9042557 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -194,13 +194,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
*/
public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
List<PTable> indexes, PhoenixConnection connection) {
- Iterator<PTable> indexesItr = maintainedIndexes(indexes.iterator());
- if ((dataTable.isImmutableRows()) || !indexesItr.hasNext()) {
- indexesItr = maintainedLocalIndexes(indexesItr);
- if (!indexesItr.hasNext()) {
- ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
- return;
- }
+ Iterator<PTable> indexesItr;
+ boolean onlyLocalIndexes = dataTable.isImmutableRows() || dataTable.isTransactional();
+ if (onlyLocalIndexes) {
+ indexesItr = maintainedLocalIndexes(indexes.iterator());
+ } else {
+ indexesItr = maintainedIndexes(indexes.iterator());
+ }
+ if (!indexesItr.hasNext()) {
+ ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ return;
}
int nIndexes = 0;
while (indexesItr.hasNext()) {
@@ -214,9 +217,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1));
// Write out data row key schema once, since it's the same for all index maintainers
dataTable.getRowKeySchema().write(output);
- indexesItr =
- dataTable.isImmutableRows() ? maintainedLocalIndexes(indexes.iterator())
- : maintainedIndexes(indexes.iterator());
+ indexesItr = onlyLocalIndexes
+ ? maintainedLocalIndexes(indexes.iterator())
+ : maintainedIndexes(indexes.iterator());
while (indexesItr.hasNext()) {
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection));
byte[] protoBytes = proto.toByteArray();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 8b1e2f1..585631d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -28,7 +28,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -58,7 +57,6 @@ import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisit
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
@@ -77,7 +75,14 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; // boolean true
private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN;
-
+ private PhoenixIndexMetaDataBuilder indexMetaDataBuilder;
+
+ @Override
+ public void setup(RegionCoprocessorEnvironment env) throws IOException {
+ super.setup(env);
+ this.indexMetaDataBuilder = new PhoenixIndexMetaDataBuilder(env);
+ }
+
private static List<Cell> flattenCells(Mutation m, int estimatedSize) throws IOException {
List<Cell> flattenedCells = Lists.newArrayListWithExpectedSize(estimatedSize);
flattenCells(m, flattenedCells);
@@ -91,8 +96,8 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
}
@Override
- public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap());
+ public PhoenixIndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ return indexMetaDataBuilder.getIndexMetaData(miniBatchOp);
}
protected PhoenixIndexCodec getCodec() {
@@ -100,11 +105,6 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
}
@Override
- public void setup(RegionCoprocessorEnvironment env) throws IOException {
- super.setup(env);
- }
-
- @Override
public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
}
@@ -383,4 +383,5 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
public ReplayWrite getReplayWrite(Mutation m) {
return PhoenixIndexMetaData.getReplayWrite(m.getAttributesMap());
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index ebad7da..c5233d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -15,10 +15,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -46,13 +46,25 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
public static final String INDEX_MAINTAINERS = "IndexMaintainers";
public static final String CLIENT_VERSION = "_ClientVersion";
public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
+
+ private byte[] regionStartKey;
+ private byte[] regionEndKey;
+ private byte[] tableName;
+
+ public PhoenixIndexCodec() {
+
+ }
- private RegionCoprocessorEnvironment env;
+ public PhoenixIndexCodec(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) {
+ initialize(conf, regionStartKey, regionEndKey, tableName);
+ }
+
@Override
- public void initialize(RegionCoprocessorEnvironment env) throws IOException {
- super.initialize(env);
- this.env = env;
+ public void initialize(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) {
+ this.regionStartKey = regionStartKey;
+ this.regionEndKey = regionEndKey;
+ this.tableName = tableName;
}
boolean hasIndexMaintainers(Map<String, byte[]> attributes) {
@@ -76,10 +88,9 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.getReplayWrite() != null, false, context);
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion()
- .getTableDesc().getName() : maintainer.getIndexTableName());
- Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
- .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
+ indexUpdate.setTable(maintainer.isLocalIndex() ? tableName : maintainer.getIndexTableName());
+ Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(),
+ regionStartKey, regionEndKey);
indexUpdate.setUpdate(put);
indexUpdates.add(indexUpdate);
}
@@ -104,10 +115,9 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
ValueGetter valueGetter = statePair.getFirst();
if (valueGetter!=null) {
IndexUpdate indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion()
- .getTableDesc().getName() : maintainer.getIndexTableName());
+ indexUpdate.setTable(maintainer.isLocalIndex() ? tableName : maintainer.getIndexTableName());
Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
- state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
+ state.getCurrentTimestamp(), regionStartKey, regionEndKey);
indexUpdate.setUpdate(delete);
indexUpdates.add(indexUpdate);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index cc254d3..46f5b77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -18,27 +18,15 @@
package org.apache.phoenix.index;
import java.io.IOException;
-import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.IndexMetaDataCache;
-import org.apache.phoenix.cache.ServerCacheClient;
-import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ServerUtil;
public class PhoenixIndexMetaData implements IndexMetaData {
private final Map<String, byte[]> attributes;
@@ -46,61 +34,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
private final ReplayWrite replayWrite;
private final boolean isImmutable;
private final boolean hasNonPkColumns;
+ private final boolean hasLocalIndexes;
- private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
- if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
- byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
- if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
- boolean useProto = false;
- byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD);
- useProto = md != null;
- if (md == null) {
- md = attributes.get(PhoenixIndexCodec.INDEX_MD);
- }
- byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
- if (md != null) {
- final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto);
- final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState);
- byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION);
- final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
- return new IndexMetaDataCache() {
-
- @Override
- public void close() throws IOException {}
-
- @Override
- public List<IndexMaintainer> getIndexMaintainers() {
- return indexMaintainers;
- }
-
- @Override
- public PhoenixTransactionContext getTransactionContext() {
- return txnContext;
- }
-
- @Override
- public int getClientVersion() {
- return clientVersion;
- }
-
- };
- } else {
- byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
- ImmutableBytesPtr tenantId = tenantIdBytes == null ? null : new ImmutableBytesPtr(tenantIdBytes);
- TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
- IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
- if (indexCache == null) {
- String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion() + "host="
- + env.getRegionServerServices().getServerName();
- SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND).setMessage(msg)
- .build().buildException();
- ServerUtil.throwIOException("Index update failed", e); // will not return
- }
- return indexCache;
- }
-
- }
-
public static boolean isIndexRebuild(Map<String,byte[]> attributes) {
return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES) != null;
}
@@ -109,18 +44,21 @@ public class PhoenixIndexMetaData implements IndexMetaData {
return ReplayWrite.fromBytes(attributes.get(BaseScannerRegionObserver.REPLAY_WRITES));
}
- public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
- this.indexMetaDataCache = getIndexMetaData(env, attributes);
+ public PhoenixIndexMetaData(IndexMetaDataCache indexMetaDataCache, Map<String, byte[]> attributes) throws IOException {
+ this.indexMetaDataCache = indexMetaDataCache;
boolean isImmutable = true;
boolean hasNonPkColumns = false;
+ boolean hasLocalIndexes = false;
for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) {
isImmutable &= maintainer.isImmutableRows();
hasNonPkColumns |= !maintainer.getIndexedColumns().isEmpty();
+ hasLocalIndexes |= maintainer.isLocalIndex();
}
this.isImmutable = isImmutable;
this.hasNonPkColumns = hasNonPkColumns;
this.attributes = attributes;
this.replayWrite = getReplayWrite(attributes);
+ this.hasLocalIndexes = hasLocalIndexes;
}
public PhoenixTransactionContext getTransactionContext() {
@@ -147,6 +85,10 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public boolean isImmutableRows() {
return isImmutable;
}
+
+ public boolean hasLocalIndexes() {
+ return hasLocalIndexes;
+ }
@Override
public boolean requiresPriorRowState(Mutation m) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
new file mode 100644
index 0000000..c954cf0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.IndexMetaDataCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ServerUtil;
+
+public class PhoenixIndexMetaDataBuilder {
+ private final RegionCoprocessorEnvironment env;
+
+ PhoenixIndexMetaDataBuilder(RegionCoprocessorEnvironment env) {
+ this.env = env;
+ }
+
+ public PhoenixIndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ IndexMetaDataCache indexMetaDataCache = getIndexMetaDataCache(env, miniBatchOp.getOperation(0).getAttributesMap());
+ return new PhoenixIndexMetaData(indexMetaDataCache, miniBatchOp.getOperation(0).getAttributesMap());
+ }
+
+ private static IndexMetaDataCache getIndexMetaDataCache(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
+ if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+ byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
+ if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+ byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD);
+ if (md == null) {
+ md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+ }
+ if (md != null) {
+ boolean useProto = md != null;
+ byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
+ final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto);
+ final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState);
+ byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION);
+ final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+ return new IndexMetaDataCache() {
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ return txnContext;
+ }
+
+ @Override
+ public int getClientVersion() {
+ return clientVersion;
+ }
+
+ };
+ } else {
+ byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
+ ImmutableBytesPtr tenantId = tenantIdBytes == null ? null : new ImmutableBytesPtr(tenantIdBytes);
+ TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
+ IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
+ if (indexCache == null) {
+ String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion() + "host="
+ + env.getRegionServerServices().getServerName();
+ SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND).setMessage(msg)
+ .build().buildException();
+ ServerUtil.throwIOException("Index update failed", e); // will not return
+ }
+ return indexCache;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index eaddf62..405fc0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -23,81 +23,41 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
-import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
-import org.apache.phoenix.filter.SkipScanFilter;
-import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.execute.PhoenixTxIndexMutationGenerator;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy;
import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
-import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
-
/**
- * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction
+ * Do all the work of managing local index updates for a transactional table from a single coprocessor. Since the transaction
* manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a
* bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios
* are handled by aborting the transaction.
@@ -122,9 +82,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
@Override
public void start(CoprocessorEnvironment e) throws IOException {
final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
+ Configuration conf = e.getConfiguration();
String serverName = env.getRegionServerServices().getServerName().getServerName();
- codec = new PhoenixIndexCodec();
- codec.initialize(env);
+ codec = new PhoenixIndexCodec(conf, env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey(), env.getRegionInfo().getTable().getName());
// Clone the config since it is shared
Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
/*
@@ -189,16 +149,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return;
}
- BatchMutateContext context = new BatchMutateContext();
- setBatchMutateContext(c, context);
-
- Map<String,byte[]> updateAttributes = m.getAttributesMap();
- PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
- if (indexMetaData.getClientVersion() >= PhoenixDatabaseMetaData.MIN_TX_CLIENT_SIDE_MAINTENANCE) {
+ PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaDataBuilder(c.getEnvironment()).getIndexMetaData(miniBatchOp);
+ if ( indexMetaData.getClientVersion() >= PhoenixDatabaseMetaData.MIN_TX_CLIENT_SIDE_MAINTENANCE
+ && !indexMetaData.hasLocalIndexes()) { // Still generate index updates server side for local indexes
super.preBatchMutate(c, miniBatchOp);
return;
}
- byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
+ BatchMutateContext context = new BatchMutateContext();
+ setBatchMutateContext(c, context);
+
Collection<Pair<Mutation, byte[]>> indexUpdates = null;
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -207,9 +166,19 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
current = NullSpan.INSTANCE;
}
- // get the index updates for all elements in this batch
- context.indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
-
+ RegionCoprocessorEnvironment env = c.getEnvironment();
+ PhoenixTransactionContext txnContext = indexMetaData.getTransactionContext();
+ if (txnContext == null) {
+ throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString());
+ }
+ PhoenixTxIndexMutationGenerator generator = new PhoenixTxIndexMutationGenerator(env.getConfiguration(), indexMetaData,
+ env.getRegionInfo().getTable().getName(),
+ env.getRegionInfo().getStartKey(),
+ env.getRegionInfo().getEndKey());
+ try (HTableInterface htable = env.getTable(env.getRegionInfo().getTable())) {
+ // get the index updates for all elements in this batch
+ context.indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
+ }
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
} catch (Throwable t) {
@@ -259,369 +228,4 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
this.batchMutateContext.remove();
}
-
- private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
- MultiMutation stored = mutations.get(row);
- // we haven't seen this row before, so add it
- if (stored == null) {
- stored = new MultiMutation(row);
- mutations.put(row, stored);
- }
- stored.addAll(m);
- }
-
- private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException {
- PhoenixTransactionContext txnContext = indexMetaData.getTransactionContext();
- if (txnContext == null) {
- throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString());
- }
- boolean isRollback = txRollbackAttribute!=null;
- boolean isImmutable = indexMetaData.isImmutableRows();
- ResultScanner currentScanner = null;
- PhoenixTransactionalTable txTable = null;
- // Collect up all mutations in batch
- Map<ImmutableBytesPtr, MultiMutation> mutations =
- new HashMap<ImmutableBytesPtr, MultiMutation>();
- Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
- if (isImmutable && !isRollback) {
- findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
- } else {
- findPriorValueMutations = mutations;
- }
- // Collect the set of mutable ColumnReferences so that we can first
- // run a scan to get the current state. We'll need this to delete
- // the existing index rows.
- List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
- int estimatedSize = indexMaintainers.size() * 10;
- Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
- for (IndexMaintainer indexMaintainer : indexMaintainers) {
- // For transactional tables, we use an index maintainer
- // to aid in rollback if there's a KeyValue column in the index. The alternative would be
- // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
- // client side.
- Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
- mutableColumns.addAll(allColumns);
- }
-
- while(mutationIterator.hasNext()) {
- Mutation m = mutationIterator.next();
- // add the mutation to the batch set
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- // if we have no non PK columns, no need to find the prior values
- if (mutations != findPriorValueMutations && indexMetaData.requiresPriorRowState(m)) {
- addMutation(findPriorValueMutations, row, m);
- }
- addMutation(mutations, row, m);
- }
-
- Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
- try {
- // Track if we have row keys with Delete mutations (or Puts that are
- // Tephra's Delete marker). If there are none, we don't need to do the scan for
- // prior versions, if there are, we do. Since rollbacks always have delete mutations,
- // this logic will work there too.
- if (!findPriorValueMutations.isEmpty()) {
- List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
- for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
- keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
- }
- Scan scan = new Scan();
- // Project all mutable columns
- for (ColumnReference ref : mutableColumns) {
- scan.addColumn(ref.getFamily(), ref.getQualifier());
- }
- /*
- * Indexes inherit the storage scheme of the data table which means all the indexes have the same
- * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
- * supporting new indexes over existing data tables to have a different storage scheme than the data
- * table.
- */
- byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
-
- // Project empty key value column
- scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
- ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
- scanRanges.initializeScan(scan);
- TableName tableName = env.getRegion().getRegionInfo().getTable();
- HTableInterface htable = env.getTable(tableName);
- txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(txnContext, htable);
- // For rollback, we need to see all versions, including
- // the last committed version as there may be multiple
- // checkpointed versions.
- SkipScanFilter filter = scanRanges.getSkipScanFilter();
- if (isRollback) {
- filter = new SkipScanFilter(filter,true);
- txnContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
- }
- scan.setFilter(filter);
- currentScanner = txTable.getScanner(scan);
- }
- if (isRollback) {
- processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations);
- } else {
- processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
- }
- } finally {
- if (txTable != null) txTable.close();
- }
-
- return indexUpdates;
- }
-
- private void processMutation(RegionCoprocessorEnvironment env,
- PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
- ResultScanner scanner,
- PhoenixTransactionContext txnContext,
- Set<ColumnReference> upsertColumns,
- Collection<Pair<Mutation, byte[]>> indexUpdates,
- Map<ImmutableBytesPtr, MultiMutation> mutations,
- Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
- if (scanner != null) {
- Result result;
- ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
- .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
- // Process existing data table rows by removing the old index row and adding the new index row
- while ((result = scanner.next()) != null) {
- Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
- TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m, emptyColRef, result);
- generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
- generatePuts(indexMetaData, indexUpdates, state);
- }
- }
- // Process new data table by adding new index rows
- for (Mutation m : mutations.values()) {
- TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m);
- generatePuts(indexMetaData, indexUpdates, state);
- generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
- }
- }
-
- private void processRollback(RegionCoprocessorEnvironment env,
- PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
- ResultScanner scanner,
- PhoenixTransactionContext tx, Set<ColumnReference> mutableColumns,
- Collection<Pair<Mutation, byte[]>> indexUpdates,
- Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException {
- if (scanner != null) {
- Result result;
- // Loop through last committed row state plus all new rows associated with current transaction
- // to generate point delete markers for all index rows that were added. We don't have Tephra
- // manage index rows in change sets because we don't want to be hit with the additional
- // memory hit and do not need to do conflict detection on index rows.
- ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
- while ((result = scanner.next()) != null) {
- Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
- // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
- // (as if we're "replaying" them in time order).
- List<Cell> cells = result.listCells();
- Collections.sort(cells, new Comparator<Cell>() {
-
- @Override
- public int compare(Cell o1, Cell o2) {
- int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp());
- if (c != 0) return c;
- c = o1.getTypeByte() - o2.getTypeByte();
- if (c != 0) return c;
- c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength());
- if (c != 0) return c;
- return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength());
- }
-
- });
- int i = 0;
- int nCells = cells.size();
- Result oldResult = null, newResult;
- long readPtr = tx.getReadPointer();
- do {
- boolean hasPuts = false;
- LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
- long writePtr;
- Cell cell = cells.get(i);
- do {
- hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode();
- writePtr = cell.getTimestamp();
- ListIterator<Cell> it = singleTimeCells.listIterator();
- do {
- // Add at the beginning of the list to match the expected HBase
- // newest to oldest sort order (which TxTableState relies on
- // with the Result.getLatestColumnValue() calls). However, we
- // still want to add Cells in the expected order for each time
- // bound as otherwise we won't find it in our old state.
- it.add(cell);
- } while (++i < nCells && (cell=cells.get(i)).getTimestamp() == writePtr);
- } while (i < nCells && cell.getTimestamp() <= readPtr);
-
- // Generate point delete markers for the prior row deletion of the old index value.
- // The write timestamp is the next timestamp, not the current timestamp,
- // as the earliest cells are the current values for the row (and we don't
- // want to delete the current row).
- if (oldResult != null) {
- TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, oldResult);
- generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
- }
- // Generate point delete markers for the new index value.
- // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not
- // generate deletes. We would have generated the delete above based on the state
- // of the previous row. The delete markers do not give us the state we need to
- // delete.
- if (hasPuts) {
- newResult = Result.create(singleTimeCells);
- // First row may represent the current state which we don't want to delete
- if (writePtr > readPtr) {
- TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, newResult);
- generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
- }
- oldResult = newResult;
- } else {
- oldResult = null;
- }
- } while (i < nCells);
- }
- }
- }
-
- private void generateDeletes(PhoenixIndexMetaData indexMetaData,
- Collection<Pair<Mutation, byte[]>> indexUpdates,
- byte[] attribValue, TxTableState state) throws IOException {
- Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
- for (IndexUpdate delete : deletes) {
- if (delete.isValid()) {
- delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
- indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
- }
- }
- }
-
- private boolean generatePuts(
- PhoenixIndexMetaData indexMetaData,
- Collection<Pair<Mutation, byte[]>> indexUpdates,
- TxTableState state)
- throws IOException {
- state.applyMutation();
- Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
- boolean validPut = false;
- for (IndexUpdate put : puts) {
- if (put.isValid()) {
- indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
- validPut = true;
- }
- }
- return validPut;
- }
-
-
- private static class TxTableState implements TableState {
- private final Mutation mutation;
- private final long currentTimestamp;
- private final RegionCoprocessorEnvironment env;
- private final Map<String, byte[]> attributes;
- private final List<KeyValue> pendingUpdates;
- private final Set<ColumnReference> indexedColumns;
- private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
-
- private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation mutation) {
- this.env = env;
- this.currentTimestamp = currentTimestamp;
- this.indexedColumns = indexedColumns;
- this.attributes = attributes;
- this.mutation = mutation;
- int estimatedSize = indexedColumns.size();
- this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
- this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
- try {
- CellScanner scanner = mutation.cellScanner();
- while (scanner.advance()) {
- Cell cell = scanner.current();
- pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
- }
- } catch (IOException e) {
- throw new RuntimeException(e); // Impossible
- }
- }
-
- public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
- this(env, indexedColumns, attributes, currentTimestamp, m);
-
- for (ColumnReference ref : indexedColumns) {
- Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
- if (cell != null) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- valueMap.put(ref, ptr);
- }
- }
- }
-
- @Override
- public RegionCoprocessorEnvironment getEnvironment() {
- return env;
- }
-
- @Override
- public long getCurrentTimestamp() {
- return currentTimestamp;
- }
-
-
- @Override
- public byte[] getCurrentRowKey() {
- return mutation.getRow();
- }
-
- @Override
- public List<? extends IndexedColumnGroup> getIndexColumnHints() {
- return Collections.emptyList();
- }
-
- private void applyMutation() {
- for (Cell cell : pendingUpdates) {
- if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
- ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- valueMap.remove(ref);
- } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
- for (ColumnReference ref : indexedColumns) {
- if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
- valueMap.remove(ref);
- }
- }
- } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
- ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
- if (indexedColumns.contains(ref)) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- valueMap.put(ref, ptr);
- }
- } else {
- throw new IllegalStateException("Unexpected mutation type for " + cell);
- }
- }
- }
-
- @Override
- public Collection<KeyValue> getPendingUpdate() {
- return pendingUpdates;
- }
-
- @Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
- throws IOException {
- // TODO: creating these objects over and over again is wasteful
- ColumnTracker tracker = new ColumnTracker(indexedColumns);
- ValueGetter getter = new ValueGetter() {
-
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
- return valueMap.get(ref);
- }
-
- @Override
- public byte[] getRowKey() {
- return mutation.getRow();
- }
-
- };
- Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
- return pair;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 6bff885..1899e37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -177,6 +177,7 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.index.PhoenixTransactionalIndexer;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -850,11 +851,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
&& !SchemaUtil.isMetaTable(tableName)
&& !SchemaUtil.isStatsTable(tableName)) {
if (isTransactional) {
+ if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+ descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
+ }
// For alter table, remove non transactional index coprocessor
if (descriptor.hasCoprocessor(Indexer.class.getName())) {
descriptor.removeCoprocessor(Indexer.class.getName());
}
} else {
+ // If exception on alter table to transition back to non transactional
+ if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
+ descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
+ }
if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index d4553ec..110868e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -128,7 +128,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
}
@Override
- public BaseRegionObserver getCoProcessor() {
+ public BaseRegionObserver getCoprocessor() {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index d335692..52ff2f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -166,7 +166,7 @@ public interface PhoenixTransactionContext {
*
* @return the coprocessor
*/
- public BaseRegionObserver getCoProcessor();
+ public BaseRegionObserver getCoprocessor();
/**
*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 7515a9c..77c3ab6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -423,7 +423,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
}
@Override
- public BaseRegionObserver getCoProcessor() {
+ public BaseRegionObserver getCoprocessor() {
return new TransactionProcessor();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
index 3f6a552..a668c21 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java
@@ -16,12 +16,12 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
@@ -52,8 +52,8 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
}
@Override
- public void initialize(RegionCoprocessorEnvironment env) {
- groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
+ public void initialize(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) {
+ groups = CoveredColumnIndexSpecifierBuilder.getColumns(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
index 7d31516..204b1a0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
@@ -17,13 +17,12 @@
*/
package org.apache.phoenix.hbase.index.covered;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
/**
@@ -59,7 +58,7 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec {
}
@Override
- public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+ public void initialize(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) {
// noop
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index 052930d..82f3c3c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -97,7 +97,7 @@ public class LocalTableStateTest {
LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(env, state, m);
+ LocalTableState table = new LocalTableState(state, m);
//add the kvs from the mutation
table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
@@ -143,7 +143,7 @@ public class LocalTableStateTest {
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(env, state, m);
+ LocalTableState table = new LocalTableState(state, m);
//add the kvs from the mutation
table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
@@ -180,7 +180,7 @@ public class LocalTableStateTest {
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(env, state, m);
+ LocalTableState table = new LocalTableState(state, m);
//add the kvs from the mutation
table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
@@ -222,7 +222,7 @@ public class LocalTableStateTest {
}
});
LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(env, state, m);
+ LocalTableState table = new LocalTableState(state, m);
// add the kvs from the mutation
KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
kv.setSequenceId(0);
@@ -270,7 +270,7 @@ public class LocalTableStateTest {
LocalHBaseState state = new LocalTable(env);
Put pendingUpdate = new Put(row);
pendingUpdate.add(fam, qual, ts, val);
- LocalTableState table = new LocalTableState(env, state, pendingUpdate);
+ LocalTableState table = new LocalTableState(state, pendingUpdate);
// do the lookup for the given column
ColumnReference col = new ColumnReference(fam, qual);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index d94cce0..f587e98 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
@@ -140,9 +141,11 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
// the following is called by PhoenixIndexCodec#getIndexUpserts() , getIndexDeletes()
HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class);
+ Mockito.when(env.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.getStartKey()).thenReturn(Bytes.toBytes("a"));
Mockito.when(mockRegionInfo.getEndKey()).thenReturn(Bytes.toBytes("z"));
+ Mockito.when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf(TEST_TABLE_STRING));
mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
index 52a238f..5cc6ada 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
@@ -176,7 +176,7 @@ public class TestCoveredColumnIndexCodec {
p.add(kv);
// check the codec for deletes it should send
- LocalTableState state = new LocalTableState(env, table, p);
+ LocalTableState state = new LocalTableState(table, p);
Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next()
.isValid());
@@ -204,7 +204,7 @@ public class TestCoveredColumnIndexCodec {
// setup the next batch of 'current state', basically just ripping out the current state from
// the last round
table = new SimpleTableState(new Result(kvs));
- state = new LocalTableState(env, table, d);
+ state = new LocalTableState(table, d);
state.setCurrentTimestamp(2);
// check the cleanup of the current table, after the puts (mocking a 'next' update)
updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
@@ -233,7 +233,7 @@ public class TestCoveredColumnIndexCodec {
private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List<KeyValue> currentState,
Delete d) throws IOException {
LocalHBaseState table = new SimpleTableState(new Result(currentState));
- LocalTableState state = new LocalTableState(env, table, d);
+ LocalTableState state = new LocalTableState(table, d);
state.setCurrentTimestamp(d.getTimeStamp());
// now we shouldn't see anything when getting the index update
state.addPendingUpdates(d.getFamilyMap().get(FAMILY));
[2/4] phoenix git commit: PHOENIX-4659 Use coprocessor API to write
local transactional indexes
Posted by ja...@apache.org.
PHOENIX-4659 Use coprocessor API to write local transactional indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b61e72f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b61e72f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b61e72f0
Branch: refs/heads/4.x-HBase-1.3
Commit: b61e72f007e46199ef27bd3308bec4f4bf448c67
Parents: 5814fcb
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Mar 17 14:18:59 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Mar 22 09:45:35 2018 -0700
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 7 ++-
.../end2end/index/txn/TxWriteFailureIT.java | 6 +--
.../index/PhoenixTransactionalIndexer.java | 29 +++++++++++-
.../phoenix/transaction/TransactionFactory.java | 13 ++++++
.../apache/phoenix/util/TransactionUtil.java | 47 ++++++++++++++++++++
5 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 38bde7f..dfbaf3f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -469,6 +469,7 @@ public class MutableIndexFailureIT extends BaseTest {
// uncommitted data when the DELETE is executed.
FailingRegionObserver.FAIL_WRITE = true;
try {
+ FailingRegionObserver.FAIL_NEXT_WRITE = localIndex && transactional;
conn.commit();
if (commitShouldFail && (!localIndex || transactional) && this.throwIndexWriteFailure) {
fail();
@@ -504,13 +505,17 @@ public class MutableIndexFailureIT extends BaseTest {
public static class FailingRegionObserver extends SimpleRegionObserver {
public static volatile boolean FAIL_WRITE = false;
+ public static volatile boolean FAIL_NEXT_WRITE = false;
public static final String FAIL_INDEX_NAME = "FAIL_IDX";
public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
boolean throwException = false;
- if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
+ if (FAIL_NEXT_WRITE) {
+ throwException = true;
+ FAIL_NEXT_WRITE = false;
+ } else if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
&& FAIL_WRITE) {
throwException = true;
} else {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
index ec60151..049611c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
@@ -105,7 +105,9 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
@Test
public void testIndexTableWriteFailure() throws Exception {
- helpTestWriteFailure(true);
+ if (!localIndex) { // We cannot fail the index write for local indexes because of the way they're written
+ helpTestWriteFailure(true);
+ }
}
@Test
@@ -175,8 +177,6 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
assertEquals("k3", rs.getString(1));
assertEquals("v3", rs.getString(2));
assertFalse(rs.next());
-
- conn.createStatement().execute("DROP TABLE " + dataTableFullName);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 405fc0c..f0b2678 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -23,9 +23,11 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -55,6 +58,7 @@ import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
/**
* Do all the work of managing local index updates for a transactional table from a single coprocessor. Since the transaction
@@ -177,8 +181,29 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
env.getRegionInfo().getEndKey());
try (HTableInterface htable = env.getTable(env.getRegionInfo().getTable())) {
// get the index updates for all elements in this batch
- context.indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
+ indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
}
+ byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+ Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+ List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+ while(indexUpdatesItr.hasNext()) {
+ Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+ if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
+ // These mutations will not go through the preDelete hooks, so we
+ // must manually convert them here.
+ Mutation mutation = TransactionUtil.convertIfDelete(next.getFirst());
+ localUpdates.add(mutation);
+ indexUpdatesItr.remove();
+ }
+ }
+ if (!localUpdates.isEmpty()) {
+ miniBatchOp.addOperationsFromCP(0,
+ localUpdates.toArray(new Mutation[localUpdates.size()]));
+ }
+ if (!indexUpdates.isEmpty()) {
+ context.indexUpdates = indexUpdates;
+ }
+
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
} catch (Throwable t) {
@@ -204,7 +229,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
if (success) { // if miniBatchOp was successfully written, write index updates
if (!context.indexUpdates.isEmpty()) {
- this.writer.write(context.indexUpdates, true);
+ this.writer.write(context.indexUpdates, false);
}
current.addTimelineAnnotation("Wrote index updates");
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index 8b3fc1d..37050fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -19,8 +19,13 @@ package org.apache.phoenix.transaction;
import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.tephra.TxConstants;
public class TransactionFactory {
@@ -140,4 +145,12 @@ public class TransactionFactory {
return table;
}
+
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+ return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index a99c700..9cd5829 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -17,12 +17,18 @@
*/
package org.apache.phoenix.util;
+import java.io.IOException;
import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -89,4 +95,45 @@ public class TransactionUtil {
timestamp = convertToMilliseconds(mutationState.getInitialWritePointer());
return timestamp;
}
+
+ // Convert HBase Delete into Put so that it can be undone if transaction is rolled back
+ public static Mutation convertIfDelete(Mutation mutation) throws IOException {
+ if (mutation instanceof Delete) {
+ Put deleteMarker = null;
+ for (byte[] family : mutation.getFamilyCellMap().keySet()) {
+ List<Cell> familyCells = mutation.getFamilyCellMap().get(family);
+ if (familyCells.size() == 1) {
+ if (CellUtil.isDeleteFamily(familyCells.get(0))) {
+ if (deleteMarker == null) {
+ deleteMarker = new Put(mutation.getRow());
+ }
+ deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteFamilyMarker(
+ deleteMarker.getRow(),
+ family,
+ familyCells.get(0).getTimestamp()));
+ }
+ } else {
+ for (Cell cell : familyCells) {
+ if (CellUtil.isDeleteColumns(cell)) {
+ if (deleteMarker == null) {
+ deleteMarker = new Put(mutation.getRow());
+ }
+ deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteColumnMarker(
+ deleteMarker.getRow(),
+ family,
+ CellUtil.cloneQualifier(cell),
+ cell.getTimestamp()));
+ }
+ }
+ }
+ if (deleteMarker != null) {
+ for (Map.Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+ deleteMarker.setAttribute(entry.getKey(), entry.getValue());
+ }
+ mutation = deleteMarker;
+ }
+ }
+ }
+ return mutation;
+ }
}