You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/08/05 21:03:35 UTC
[36/50] [abbrv] phoenix git commit: PHOENIX-3128 Remove extraneous
operations during upsert with local immutable index
PHOENIX-3128 Remove extraneous operations during upsert with local immutable index
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1d4a801
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1d4a801
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1d4a801
Branch: refs/heads/encodecolumns
Commit: d1d4a801265788919a6958e3585a137ec6f71eb2
Parents: 446c58b
Author: James Taylor <ja...@apache.org>
Authored: Tue Aug 2 12:03:33 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Aug 2 17:03:53 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/DistinctPrefixFilterIT.java | 11 ++-
.../apache/phoenix/end2end/index/IndexIT.java | 39 +++++++++
.../compile/PostLocalIndexDDLCompiler.java | 15 ++--
.../apache/phoenix/execute/MutationState.java | 11 ++-
.../hbase/index/covered/IndexMetaData.java | 9 +-
.../hbase/index/covered/NonTxIndexBuilder.java | 6 +-
.../phoenix/index/PhoenixIndexMetaData.java | 12 ++-
.../index/PhoenixTransactionalIndexer.java | 89 +++++++++++++-------
.../org/apache/phoenix/schema/PTableImpl.java | 13 ++-
.../apache/phoenix/util/TransactionUtil.java | 7 ++
10 files changed, 164 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
index 203d51e..9d31070 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
@@ -88,6 +88,15 @@ public class DistinctPrefixFilterIT extends BaseHBaseManagedTimeTableReuseIT {
insertPrefixV("3", "1");
insertPrefixV("3", "2");
insertPrefixV("3", "3");
+ conn.commit();
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ count(*) from " + testTableV);
+ assertTrue(rs.next());
+ long count1 = rs.getLong(1);
+ rs = conn.createStatement().executeQuery("select count(*) from " + testTableV + "_idx");
+ assertTrue(rs.next());
+ long count2 = rs.getLong(1);
+ assertEquals(count1,count2);
multiply();
multiply();
@@ -258,7 +267,7 @@ public class DistinctPrefixFilterIT extends BaseHBaseManagedTimeTableReuseIT {
testCommonDistinct(testTableF);
testCommonDistinct(testTableV);
-}
+ }
private void testCommonDistinct(String testTable) throws Exception {
testSkipRange("SELECT %s DISTINCT prefix1 FROM " + testTable, 4);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
index 35a0aad..df45ecb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
@@ -39,7 +40,15 @@ import java.util.Collection;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.coprocessor.generated.PTableProtos.PTableType;
@@ -56,6 +65,7 @@ import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -63,6 +73,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -871,6 +882,7 @@ public class IndexIT extends BaseHBaseManagedTimeTableReuseIT {
conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, \"V1\" VARCHAR, \"v2\" VARCHAR)"+tableDDLOptions);
query = "SELECT * FROM "+fullTableName;
rs = conn.createStatement().executeQuery(query);
+ long ts = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,fullTableName)).getTimeStamp();
assertFalse(rs.next());
conn.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "(\"v2\") INCLUDE (\"V1\")");
@@ -941,9 +953,36 @@ public class IndexIT extends BaseHBaseManagedTimeTableReuseIT {
assertEquals("2",rs.getString(5));
assertEquals("2",rs.getString("v2"));
assertFalse(rs.next());
+
+ assertNoIndexDeletes(conn, ts, fullIndexName);
}
}
+ private void assertNoIndexDeletes(Connection conn, long minTimestamp, String fullIndexName) throws IOException, SQLException {
+ if (!this.mutable) {
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ PTable index = pconn.getTable(new PTableKey(null, fullIndexName));
+ byte[] physicalIndexTable = index.getPhysicalName().getBytes();
+ try (HTableInterface hIndex = pconn.getQueryServices().getTable(physicalIndexTable)) {
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ if (this.transactional) {
+ minTimestamp = TransactionUtil.convertToNanoseconds(minTimestamp);
+ }
+ scan.setTimeRange(minTimestamp, HConstants.LATEST_TIMESTAMP);
+ ResultScanner scanner = hIndex.getScanner(scan);
+ Result result;
+ while ((result = scanner.next()) != null) {
+ CellScanner cellScanner = result.cellScanner();
+ while (cellScanner.advance()) {
+ Cell current = cellScanner.current();
+ assertEquals (KeyValue.Type.Put.getCode(), current.getTypeByte());
+ }
+ }
+ };
+ }
+ }
+
@Test
public void testInFilterOnIndexedTable() throws Exception {
String tableName = "TBL_" + generateRandomString();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
index 079ff5c..81dbe0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.compile;
-import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@@ -34,9 +33,9 @@ import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.ScanUtil;
import com.google.common.collect.Lists;
@@ -93,10 +92,14 @@ public class PostLocalIndexDDLCompiler {
@Override
public MutationState execute() throws SQLException {
connection.getMutationState().commitDDLFence(dataTable);
- Cell kv = plan.iterator().next().getValue(0);
- ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
- // A single Cell will be returned with the count(*) - we decode that here
- long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+ Tuple tuple = plan.iterator().next();
+ long rowCount = 0;
+ if (tuple != null) {
+ Cell kv = tuple.getValue(0);
+ ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ // A single Cell will be returned with the count(*) - we decode that here
+ rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+ }
// The contract is to return a MutationState that contains the number of rows modified. In this
// case, it's the number of rows in the data table which corresponds to the number of index
// rows that were added.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index ca026f5..ae78e97 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -552,12 +552,15 @@ public class MutationState implements SQLCloseable {
return ptr;
}
- private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) {
+ private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
+ final long timestamp, boolean includeAllIndexes, final boolean sendAll) {
final PTable table = tableRef.getTable();
final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
- (table.isImmutableRows() || includeMutableIndexes) ?
- IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) :
- Iterators.<PTable>emptyIterator();
+ includeAllIndexes || table.isWALDisabled() ? // TODO: remove check for isWALDisabled once PHOENIX-3137 is fixed.
+ IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) :
+ table.isImmutableRows() ?
+ IndexMaintainer.enabledGlobalIndexIterator(table.getIndexes().iterator()) :
+ Iterators.<PTable>emptyIterator();
final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index ee25a40..5420013 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -18,5 +18,12 @@
package org.apache.phoenix.hbase.index.covered;
public interface IndexMetaData {
- public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {};
+ public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {
+
+ @Override
+ public boolean isImmutableRows() {
+ return false;
+ }};
+
+ public boolean isImmutableRows();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 11e7d1a..f42ea5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -98,7 +98,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
Collection<Batch> batches = createTimestampBatchesFromMutation(m);
// go through each batch of keyvalues and build separate index entries for each
- boolean cleanupCurrentState = true;
+ boolean cleanupCurrentState = !indexMetaData.isImmutableRows();
for (Batch batch : batches) {
/*
* We have to split the work between the cleanup and the update for each group because when we update the
@@ -215,7 +215,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
// determine if we need to make any cleanup given the pending update.
long batchTs = batch.getTimestamp();
state.setPendingUpdates(batch.getKvs());
- addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
+ if (!indexMetaData.isImmutableRows()) {
+ addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
+ }
// A.2 do a single pass first for the updates to the current state
state.applyPendingUpdates();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 7a67b9c..2679f1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -36,13 +36,13 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
-
import org.apache.tephra.Transaction;
public class PhoenixIndexMetaData implements IndexMetaData {
private final Map<String, byte[]> attributes;
private final IndexMetaDataCache indexMetaDataCache;
private final boolean ignoreNewerMutations;
+ private final boolean isImmutable;
private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
@@ -87,6 +87,11 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
this.indexMetaDataCache = getIndexMetaData(env, attributes);
+ boolean isImmutable = true;
+ for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) {
+ isImmutable &= maintainer.isImmutableRows();
+ }
+ this.isImmutable = isImmutable;
this.attributes = attributes;
this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
}
@@ -106,4 +111,9 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public boolean ignoreNewerMutations() {
return ignoreNewerMutations;
}
+
+ @Override
+ public boolean isImmutableRows() {
+ return isImmutable;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index f8be3ee..c67da6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -64,7 +64,6 @@ import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.query.KeyRange;
@@ -72,10 +71,10 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
-import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.apache.tephra.Transaction;
import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TxConstants;
@@ -236,49 +235,68 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
}
+ private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
+ MultiMutation stored = mutations.get(row);
+ // we haven't seen this row before, so add it
+ if (stored == null) {
+ stored = new MultiMutation(row);
+ mutations.put(row, stored);
+ }
+ stored.addAll(m);
+ }
+
private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException {
+ Transaction tx = indexMetaData.getTransaction();
+ if (tx == null) {
+ throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ boolean isRollback = txRollbackAttribute!=null;
+ boolean isImmutable = indexMetaData.isImmutableRows();
ResultScanner currentScanner = null;
TransactionAwareHTable txTable = null;
// Collect up all mutations in batch
Map<ImmutableBytesPtr, MultiMutation> mutations =
new HashMap<ImmutableBytesPtr, MultiMutation>();
+ Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
+ if (isImmutable && !isRollback) {
+ findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
+ } else {
+ findPriorValueMutations = mutations;
+ }
while(mutationIterator.hasNext()) {
Mutation m = mutationIterator.next();
// add the mutation to the batch set
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- MultiMutation stored = mutations.get(row);
- // we haven't seen this row before, so add it
- if (stored == null) {
- stored = new MultiMutation(row);
- mutations.put(row, stored);
+ if (mutations != findPriorValueMutations && isDeleteMutation(m)) {
+ addMutation(findPriorValueMutations, row, m);
}
- stored.addAll(m);
+ addMutation(mutations, row, m);
}
// Collect the set of mutable ColumnReferences so that we can first
// run a scan to get the current state. We'll need this to delete
// the existing index rows.
- Transaction tx = indexMetaData.getTransaction();
- if (tx == null) {
- throw new NullPointerException("Expected to find transaction in metadata for " +
- env.getRegion().getRegionInfo().getTable().getNameAsString());
- }
List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
- Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
+ int estimatedSize = indexMaintainers.size() * 10;
+ Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
for (IndexMaintainer indexMaintainer : indexMaintainers) {
// For transactional tables, we use an index maintainer
// to aid in rollback if there's a KeyValue column in the index. The alternative would be
// to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
// client side.
- mutableColumns.addAll(indexMaintainer.getAllColumns());
+ Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+ mutableColumns.addAll(allColumns);
}
- boolean isRollback = txRollbackAttribute!=null;
Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
try {
- if (!mutableColumns.isEmpty()) {
+ // Track if we have row keys with Delete mutations (or Puts that are
+ // Tephra's Delete marker). If there are none, we don't need to do the scan for
+ // prior versions, if there are, we do. Since rollbacks always have delete mutations,
+ // this logic will work there too.
+ if (!findPriorValueMutations.isEmpty()) {
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
- for (ImmutableBytesPtr ptr : mutations.keySet()) {
+ for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
}
Scan scan = new Scan();
@@ -306,9 +324,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
currentScanner = txTable.getScanner(scan);
}
if (isRollback) {
- processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates);
+ processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations);
} else {
- processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates);
+ processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
}
} finally {
if (txTable != null) txTable.close();
@@ -317,26 +335,39 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return indexUpdates;
}
+ private static boolean isDeleteMutation(Mutation m) {
+ for (Map.Entry<byte[],List<Cell>> cellMap : m.getFamilyCellMap().entrySet()) {
+ for (Cell cell : cellMap.getValue()) {
+ if (cell.getTypeByte() != KeyValue.Type.Put.getCode() || TransactionUtil.isDelete(cell)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
private void processMutation(RegionCoprocessorEnvironment env,
PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
ResultScanner scanner,
- Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx,
- Set<ColumnReference> mutableColumns,
- Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
+ Transaction tx,
+ Set<ColumnReference> upsertColumns,
+ Collection<Pair<Mutation, byte[]>> indexUpdates,
+ Map<ImmutableBytesPtr, MultiMutation> mutations,
+ Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
if (scanner != null) {
Result result;
ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
// Process existing data table rows by removing the old index row and adding the new index row
while ((result = scanner.next()) != null) {
- Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
- TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
+ Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
+ TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
generatePuts(indexMetaData, indexUpdates, state);
}
}
// Process new data table by adding new index rows
for (Mutation m : mutations.values()) {
- TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m);
+ TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m);
generatePuts(indexMetaData, indexUpdates, state);
}
}
@@ -344,9 +375,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private void processRollback(RegionCoprocessorEnvironment env,
PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
ResultScanner scanner,
- Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx,
- Set<ColumnReference> mutableColumns,
- Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
+ Transaction tx, Set<ColumnReference> mutableColumns,
+ Collection<Pair<Mutation, byte[]>> indexUpdates,
+ Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException {
if (scanner != null) {
Result result;
// Loop through last committed row state plus all new rows associated with current transaction
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index ec09992..847979a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -834,12 +834,17 @@ public class PTableImpl implements PTable {
// we're using the Tephra column family delete marker here to prevent the translation
// of deletes to puts by the Tephra's TransactionProcessor
if (PTableImpl.this.isTransactional()) {
- Put delete = new Put(key);
- for (PColumnFamily colFamily : families) {
- delete.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+ Put put = new Put(key);
+ if (families.isEmpty()) {
+ put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TxConstants.FAMILY_DELETE_QUALIFIER, ts,
HConstants.EMPTY_BYTE_ARRAY);
+ } else {
+ for (PColumnFamily colFamily : families) {
+ put.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+ HConstants.EMPTY_BYTE_ARRAY);
+ }
}
- deleteRow = delete;
+ deleteRow = put;
} else {
Delete delete = new Delete(key);
for (PColumnFamily colFamily : families) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 1dcf9d3..0e044b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -19,6 +19,9 @@ package org.apache.phoenix.util;
import java.sql.SQLException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -35,6 +38,10 @@ public class TransactionUtil {
private TransactionUtil() {
}
+ public static boolean isDelete(Cell cell) {
+ return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY));
+ }
+
public static long convertToNanoseconds(long serverTimeStamp) {
return serverTimeStamp * TxConstants.MAX_TX_PER_MS;
}