You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/10/20 19:07:57 UTC
phoenix git commit: PHOENIX-4280 Delete doesn't work when immutable
indexes are in building state
Repository: phoenix
Updated Branches:
refs/heads/master e50b357a0 -> dc432b975
PHOENIX-4280 Delete doesn't work when immutable indexes are in building state
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dc432b97
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dc432b97
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dc432b97
Branch: refs/heads/master
Commit: dc432b9754d028d0d36652c69b25e0fdb735b3fa
Parents: e50b357
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Oct 19 17:52:29 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 20 12:06:47 2017 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/DropColumnIT.java | 3 +-
.../phoenix/end2end/index/ImmutableIndexIT.java | 105 ++++++++++++++++-
.../end2end/index/IndexMaintenanceIT.java | 7 +-
.../apache/phoenix/compile/DeleteCompiler.java | 18 ++-
.../hbase/index/builder/BaseIndexCodec.java | 33 +++---
.../hbase/index/covered/IndexMetaData.java | 13 ++-
.../hbase/index/covered/LocalTableState.java | 69 ++++++-----
.../hbase/index/covered/NonTxIndexBuilder.java | 115 +------------------
.../hbase/index/scanner/ScannerBuilder.java | 2 +-
.../hbase/index/util/IndexManagementUtil.java | 2 -
.../apache/phoenix/index/IndexMaintainer.java | 29 ++++-
.../phoenix/index/PhoenixIndexMetaData.java | 14 ++-
.../index/PhoenixTransactionalIndexer.java | 34 ++----
.../index/covered/LocalTableStateTest.java | 31 ++---
.../index/covered/NonTxIndexBuilderTest.java | 2 +-
15 files changed, 255 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
index 4f6c37e..badb2a6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@@ -294,7 +295,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
if (!mutable && columnEncoded) {
KeyValueColumnExpression colExpression = new SingleCellColumnExpression(localIndexCol, "0:V2", localIndexTable.getEncodingScheme());
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
- colExpression.evaluate(new ResultTuple(result), ptr);
+ assertTrue(colExpression.evaluate(new ResultTuple(result), ptr));
colValue = ptr.copyBytesIfNecessary();
}
else {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 4c43068..9eb5440 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -29,6 +29,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -40,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -47,12 +49,15 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
@@ -146,8 +151,13 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
try {
conn.createStatement().execute(dml);
- fail();
+ if (!localIndex) {
+ fail();
+ }
} catch (SQLException e) {
+ if (localIndex) {
+ throw e;
+ }
assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(),
e.getErrorCode());
}
@@ -156,6 +166,99 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ @Test
+ public void testDeleteFromPartialPK() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IND_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+ String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String ddl =
+ "CREATE TABLE " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+ populateTestTable(fullTableName);
+ ddl =
+ "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON "
+ + fullTableName + " (char_pk, varchar_pk)";
+ stmt.execute(ddl);
+
+ ResultSet rs;
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+
+ String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'";
+ conn.createStatement().execute(dml);
+ assertIndexMutations(conn);
+ conn.commit();
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ }
+ }
+
+ @Test
+ public void testDeleteFromNonPK() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IND_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+ String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String ddl =
+ "CREATE TABLE " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+ populateTestTable(fullTableName);
+ ddl =
+ "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON "
+ + fullTableName + " (varchar_col1, varchar_pk)";
+ stmt.execute(ddl);
+
+ ResultSet rs;
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+
+ String dml = "DELETE from " + fullTableName + " WHERE varchar_col1='varchar_a' AND varchar_pk='varchar1'";
+ conn.createStatement().execute(dml);
+ assertIndexMutations(conn);
+ conn.commit();
+
+ TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ }
+ }
+
+ private void assertIndexMutations(Connection conn) throws SQLException {
+ Iterator<Pair<byte[], List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
+ assertTrue(iterator.hasNext());
+ iterator.next();
+ assertEquals(!localIndex, iterator.hasNext());
+ }
+
// This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back.
@Ignore
@Test
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
index 7d02e80..d5895ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
@@ -344,13 +344,14 @@ public class IndexMaintenanceIT extends ParallelStatsDisabledIT {
String dml = "DELETE from " + fullDataTableName + " WHERE long_col2 = 2";
try {
conn.createStatement().execute(dml);
- if (!mutable) {
+ if (!mutable && !localIndex) {
fail();
}
} catch (SQLException e) {
- if (!mutable) {
- assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
+ if (mutable || localIndex) {
+ throw e;
}
+ assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
}
if (!mutable) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index be07cf4..eb252d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.cache.ServerCacheClient;
@@ -47,6 +48,7 @@ import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -62,7 +64,6 @@ import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
@@ -75,13 +76,13 @@ import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
@@ -167,6 +168,11 @@ public class DeleteCompiler {
for (int i = 0; i < indexTableRefs.size(); i++) {
ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
rs.getCurrentRow().getKey(indexPtr);
+ // Translate the data table row to the index table row
+ if (sourceTableRef.getTable().getType() != PTableType.INDEX) {
+ IndexMaintainer maintainer = indexTableRefs.get(i).getTable().getIndexMaintainer(table, connection);
+ indexPtr.set(maintainer.buildRowKey(null, indexPtr, null, null, HConstants.LATEST_TIMESTAMP));
+ }
indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
}
if (mutations.size() > maxSize) {
@@ -242,12 +248,12 @@ public class DeleteCompiler {
}
- private Map<PTableKey, PTable> getNonDisabledImmutableIndexes(TableRef tableRef) {
+ private Map<PTableKey, PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) {
PTable table = tableRef.getTable();
if (table.isImmutableRows() && !table.getIndexes().isEmpty()) {
Map<PTableKey, PTable> nonDisabledIndexes = new HashMap<PTableKey, PTable>(table.getIndexes().size());
for (PTable index : table.getIndexes()) {
- if (index.getIndexState() != PIndexState.DISABLE) {
+ if (index.getIndexState() != PIndexState.DISABLE && index.getIndexType() == IndexType.GLOBAL) {
nonDisabledIndexes.put(index.getKey(), index);
}
}
@@ -401,7 +407,7 @@ public class DeleteCompiler {
.setTableName(tableName).build().buildException();
}
- immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
+ immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe);
boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty();
noQueryReqd = !hasLimit;
// Can't run on same server for transactional data, as we need the row keys for the data
@@ -444,7 +450,7 @@ public class DeleteCompiler {
// of immutable indexes.
table = connection.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
tableRefToBe.setTable(table);
- immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
+ immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe);
}
} catch (MetaDataEntityNotFoundException e) {
// Catch column/column family not found exception, as our meta data may
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
index 1ce4e2e..cf6e95e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -23,25 +23,22 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
-/**
- *
- */
public abstract class BaseIndexCodec implements IndexCodec {
- @Override
- public void initialize(RegionCoprocessorEnvironment env) throws IOException {
- // noop
- }
+ @Override
+ public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+ // noop
+ }
- /**
- * {@inheritDoc}
- * <p>
- * By default, the codec is always enabled. Subclasses should override this method if they want do
- * decide to index on a per-mutation basis.
- * @throws IOException
- */
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- return true;
- }
+ /**
+ * {@inheritDoc}
+ * <p>
+ * By default, the codec is always enabled. Subclasses should override this method if they want do
+ * decide to index on a per-mutation basis.
+ * @throws IOException
+ */
+ @Override
+ public boolean isEnabled(Mutation m) throws IOException {
+ return true;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index 5314631..20ed855 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -17,14 +17,15 @@
*/
package org.apache.phoenix.hbase.index.covered;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
public interface IndexMetaData {
public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {
@Override
- public boolean isImmutableRows() {
- return false;
+ public boolean requiresPriorRowState(Mutation m) {
+ return true;
}
@Override
@@ -32,7 +33,13 @@ public interface IndexMetaData {
return null;
}};
- public boolean isImmutableRows();
+
+ /**
+ * Determines whether or not we need to look up the old row to retrieve old row values for maintaining the index.
+ * @param m mutation being performed on the data table
+ * @return true if prior row state is required and false otherwise
+ */
+ public boolean requiresPriorRowState(Mutation m);
public ReplayWrite getReplayWrite();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 0f5a9f9..f7784e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -164,8 +164,27 @@ public class LocalTableState implements TableState {
* @throws IOException
*/
public Pair<CoveredDeleteScanner, IndexUpdate> getIndexedColumnsTableState(
- Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) throws IOException {
- ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations, indexMetaData);
+ Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean isStateForDeletes, IndexMetaData indexMetaData) throws IOException {
+ // check to see if we haven't initialized any columns yet
+ Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(indexedColumns);
+
+ // add the covered columns to the set
+ for (ColumnReference ref : toCover) {
+ this.columnSet.addColumn(ref);
+ }
+ boolean requiresPriorRowState = indexMetaData.requiresPriorRowState(update);
+ if (!toCover.isEmpty()) {
+ // no need to perform scan to find prior row values when the indexed columns are immutable, as
+ // by definition, there won't be any. If we have indexed non row key columns, then we need to
+ // look up the row so that we can formulate the delete of the index row correctly. We'll always
+ // have our "empty" key value column, so we check if we have more than that as a basis for
+ // needing to lookup the prior row values.
+ if (requiresPriorRowState) {
+ // add the current state of the row. Uses listCells() to avoid a new array creation.
+ this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
+ }
+ }
+
// filter out things with a newer timestamp and track the column references to which it applies
ColumnTracker tracker = new ColumnTracker(indexedColumns);
synchronized (this.trackedColumns) {
@@ -175,35 +194,27 @@ public class LocalTableState implements TableState {
}
}
- CoveredDeleteScanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts, returnNullScannerIfRowNotFound);
-
+ CoveredDeleteScanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts,
+ // If we're determining the index state for deletes and either
+ // a) we've looked up the prior row state or
+ // b) we're inserting immutable data
+ // then allow a null scanner to be returned.
+ // FIXME: this is crappy code - we need to simplify the global mutable secondary index implementation
+ // TODO: use mutable transactional secondary index implementation instead (PhoenixTransactionalIndexer)
+ isStateForDeletes && (requiresPriorRowState || insertingData(update)));
return new Pair<CoveredDeleteScanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
}
- /**
- * Initialize the managed local state. Generally, this will only be called by
- * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
- * then, there is still fairly low contention as each new Put/Delete will have its own table state.
- * @param indexMetaData TODO
- */
- private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations, IndexMetaData indexMetaData)
- throws IOException {
- // check to see if we haven't initialized any columns yet
- Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
- // we have all the columns loaded, so we are good to go.
- if (toCover.isEmpty()) { return; }
-
- // no need to perform scan to find prior row values when the indexed columns are immutable, as
- // by definition, there won't be any.
- if (!indexMetaData.isImmutableRows()) {
- // add the current state of the row. Uses listCells() to avoid a new array creation.
- this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
- }
-
- // add the covered columns to the set
- for (ColumnReference ref : toCover) {
- this.columnSet.addColumn(ref);
+
+ private static boolean insertingData(Mutation m) {
+ for (Collection<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) != KeyValue.Type.Put) {
+ return false;
+ }
+ }
}
+ return true;
}
@Override
@@ -264,9 +275,9 @@ public class LocalTableState implements TableState {
}
@Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean isStateForDeletes, IndexMetaData indexMetaData)
throws IOException {
- Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, returnNullScannerIfRowNotFound, indexMetaData);
+ Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, isStateForDeletes, indexMetaData);
ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 50e2c3f..8dd57c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -10,10 +10,8 @@
package org.apache.phoenix.hbase.index.covered;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,9 +27,6 @@ import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
import org.apache.phoenix.hbase.index.covered.data.LocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-
-import com.google.common.collect.Lists;
/**
* Build covered indexes for phoenix updates.
@@ -99,17 +94,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
}
}
- // go through each batch of keyvalues and build separate index entries for each
- boolean cleanupCurrentState = !indexMetaData.isImmutableRows();
- /*
- * We have to split the work between the cleanup and the update for each group because when we update the
- * current state of the row for the current batch (appending the mutations for the current batch) the next
- * group will see that as the current state, which will can cause the a delete and a put to be created for
- * the next group.
- */
- if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) {
- cleanupCurrentState = false;
- }
+ addMutationsForBatch(manager, batch, state, indexMetaData);
}
/**
@@ -138,17 +123,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
* timestamp-based batch of edits
* @param state
* local state to update and pass to the codec
- * @param requireCurrentStateCleanup
- * <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a
- * 'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier
- * batch already did the cleanup.
* @param indexMetaData TODO
* @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt>
* otherwise
* @throws IOException
*/
private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state,
- boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) throws IOException {
+ IndexMetaData indexMetaData) throws IOException {
// need a temporary manager for the current batch. It should resolve any conflicts for the
// current batch. Essentially, we can get the case where a batch doesn't change the current
@@ -160,9 +141,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
// determine if we need to make any cleanup given the pending update.
long batchTs = batch.getTimestamp();
state.setPendingUpdates(batch.getKvs());
- if (!indexMetaData.isImmutableRows()) {
- addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
- }
+ addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
// A.2 do a single pass first for the updates to the current state
state.applyPendingUpdates();
@@ -170,36 +149,6 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
// FIXME: PHOENIX-4057 do not attempt to issue index updates
// for out-of-order mutations since it corrupts the index.
return false;
-
-// long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData);
-// // if all the updates are the latest thing in the index, we are done - don't go and fix history
-// if (ColumnTracker.isNewestTime(minTs)) { return false; }
-//
-// // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
-// // index. after this, we have the correct view of the index, from the batch up to the index
-// while (!ColumnTracker.isNewestTime(minTs)) {
-// minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData);
-// }
-//
-// // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
-// if (requireCurrentStateCleanup) {
-// // roll back the pending update. This is needed so we can remove all the 'old' index entries.
-// // We don't need to do the puts here, but just the deletes at the given timestamps since we
-// // just want to completely hide the incorrect entries.
-// state.rollback(batch.getKvs());
-// // setup state
-// state.setPendingUpdates(batch.getKvs());
-//
-// // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
-// // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
-// // because the update may have a different set of columns or value based on the update).
-// cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData);
-//
-// // have to roll the state forward again, so the current state is correct
-// state.applyPendingUpdates();
-// return true;
-// }
-// return false;
}
private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData)
@@ -249,7 +198,6 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
*/
// timestamp of the next update we need to track
long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
- List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
for (IndexUpdate update : upserts) {
// this is the one bit where we check the timestamps
final ColumnTracker tracker = update.getIndexedColumns();
@@ -265,71 +213,17 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
continue;
}
- // track index hints for the next round. Hint if we need an update for that column for the
- // next timestamp. These columns clearly won't need to update as we go through time as they
- // already match the most recent possible thing.
- boolean needsCleanup = false;
- if (tracker.hasNewerTimestamps()) {
- columnHints.add(tracker);
- // this update also needs to be cleaned up at the next timestamp because it not the latest.
- needsCleanup = true;
- }
-
// only make the put if the index update has been setup
if (update.isValid()) {
byte[] table = update.getTableName();
Mutation mutation = update.getUpdate();
updateMap.addIndexUpdate(table, mutation);
-
- // only make the cleanup if we made a put and need cleanup
- if (needsCleanup) {
- // there is a TS for the interested columns that is greater than the columns in the
- // put. Therefore, we need to issue a delete at the same timestamp
- Delete d = new Delete(mutation.getRow());
- d.setTimestamp(tracker.getTS());
- updateMap.addIndexUpdate(table, d);
- }
}
}
return minTs;
}
/**
- * Cleanup the index based on the current state from the given batch. Iterates over each timestamp (for the indexed
- * rows) for the current state of the table and cleans up all the existing entries generated by the codec.
- * <p>
- * Adds all pending updates to the updateMap
- *
- * @param updateMap
- * updated with the pending index updates from the codec
- * @param batchTs
- * timestamp from which we should cleanup
- * @param state
- * current state of the primary table. Should already by setup to the correct state from which we want to
- * cleanup.
- * @param indexMetaData TODO
- * @throws IOException
- */
- private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
- throws IOException {
- // get the cleanup for the current state
- state.setCurrentTimestamp(batchTs);
- addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
- Set<ColumnTracker> trackers = state.getTrackedColumns();
- long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
- for (ColumnTracker tracker : trackers) {
- if (tracker.getTS() < minTs) {
- minTs = tracker.getTS();
- }
- }
- state.resetTrackedColumns();
- if (!ColumnTracker.isNewestTime(minTs)) {
- state.setHints(Lists.newArrayList(trackers));
- cleanupIndexStateFromBatchOnward(updateMap, minTs, state, indexMetaData);
- }
- }
-
- /**
* Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
* update map.
* <p>
@@ -340,9 +234,6 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
*/
protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData)
throws IOException {
- if (indexMetaData.isImmutableRows()) {
- return;
- }
Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData);
if (cleanup != null) {
for (IndexUpdate d : cleanup) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 5547958..ad09c0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -71,7 +71,7 @@ public class ScannerBuilder {
// filter out kvs based on deletes
ApplyAndFilterDeletesFilter deleteFilter = new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns));
filters.addFilter(deleteFilter);
-
+
// combine the family filters and the rest of the filters as a
return getFilteredScanner(filters, returnNullIfRowNotFound, deleteFilter.getDeleteTracker());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 697caef..a4a34a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -25,7 +25,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 83b1d58..b4566a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.ByteStringer;
@@ -1048,10 +1049,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
- private DeleteType getDeleteTypeOrNull(Collection<KeyValue> pendingUpdates) {
+ private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates) {
+ return getDeleteTypeOrNull(pendingUpdates, this.nDataCFs);
+ }
+
+ private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates, int nCFs) {
int nDeleteCF = 0;
int nDeleteVersionCF = 0;
- for (KeyValue kv : pendingUpdates) {
+ for (Cell kv : pendingUpdates) {
if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
nDeleteVersionCF++;
}
@@ -1064,22 +1069,34 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// This is what a delete looks like on the server side for mutable indexing...
// Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not
DeleteType deleteType = null;
- if (nDeleteVersionCF > 0 && nDeleteVersionCF >= this.nDataCFs) {
+ if (nDeleteVersionCF > 0 && nDeleteVersionCF >= nCFs) {
deleteType = DeleteType.SINGLE_VERSION;
} else {
int nDelete = nDeleteCF + nDeleteVersionCF;
- if (nDelete>0 && nDelete >= this.nDataCFs) {
+ if (nDelete>0 && nDelete >= nCFs) {
deleteType = DeleteType.ALL_VERSIONS;
}
}
return deleteType;
}
- public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
+ public boolean isRowDeleted(Collection<? extends Cell> pendingUpdates) {
return getDeleteTypeOrNull(pendingUpdates) != null;
}
- private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates, long ts) throws IOException {
+ public boolean isRowDeleted(Mutation m) {
+ if (m.getFamilyCellMap().size() < this.nDataCFs) {
+ return false;
+ }
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ if (getDeleteTypeOrNull(cells, 1) == null) { // Checking CFs one by one
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<? extends Cell> pendingUpdates, long ts) throws IOException {
if (pendingUpdates.isEmpty()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 7908103..05371a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.IndexMetaDataCache;
@@ -43,6 +44,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
private final IndexMetaDataCache indexMetaDataCache;
private final ReplayWrite replayWrite;
private final boolean isImmutable;
+ private final boolean hasNonPkColumns;
private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
@@ -102,10 +104,13 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
this.indexMetaDataCache = getIndexMetaData(env, attributes);
boolean isImmutable = true;
+ boolean hasNonPkColumns = false;
for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) {
isImmutable &= maintainer.isImmutableRows();
+ hasNonPkColumns |= !maintainer.getIndexedColumns().isEmpty();
}
this.isImmutable = isImmutable;
+ this.hasNonPkColumns = hasNonPkColumns;
this.attributes = attributes;
this.replayWrite = getReplayWrite(attributes);
}
@@ -122,12 +127,17 @@ public class PhoenixIndexMetaData implements IndexMetaData {
return attributes;
}
+ @Override
public ReplayWrite getReplayWrite() {
return replayWrite;
}
-
- @Override
+
public boolean isImmutableRows() {
return isImmutable;
}
+
+ @Override
+ public boolean requiresPriorRowState(Mutation m) {
+ return !isImmutable || (indexMetaDataCache.getIndexMaintainers().get(0).isRowDeleted(m) && hasNonPkColumns);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index bc53b6b..3495267 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -89,7 +89,6 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TransactionUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -284,16 +283,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
} else {
findPriorValueMutations = mutations;
}
- while(mutationIterator.hasNext()) {
- Mutation m = mutationIterator.next();
- // add the mutation to the batch set
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- if (mutations != findPriorValueMutations && isDeleteMutation(m)) {
- addMutation(findPriorValueMutations, row, m);
- }
- addMutation(mutations, row, m);
- }
-
// Collect the set of mutable ColumnReferences so that we can first
// run a scan to get the current state. We'll need this to delete
// the existing index rows.
@@ -309,6 +298,17 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
mutableColumns.addAll(allColumns);
}
+ while(mutationIterator.hasNext()) {
+ Mutation m = mutationIterator.next();
+ // add the mutation to the batch set
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ // if we have no non PK columns, no need to find the prior values
+ if (mutations != findPriorValueMutations && indexMetaData.requiresPriorRowState(m)) {
+ addMutation(findPriorValueMutations, row, m);
+ }
+ addMutation(mutations, row, m);
+ }
+
Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
try {
// Track if we have row keys with Delete mutations (or Puts that are
@@ -363,17 +363,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return indexUpdates;
}
- private static boolean isDeleteMutation(Mutation m) {
- for (Map.Entry<byte[],List<Cell>> cellMap : m.getFamilyCellMap().entrySet()) {
- for (Cell cell : cellMap.getValue()) {
- if (cell.getTypeByte() != KeyValue.Type.Put.getCode() || TransactionUtil.isDelete(cell)) {
- return true;
- }
- }
- }
- return false;
- }
-
private void processMutation(RegionCoprocessorEnvironment env,
PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
ResultScanner scanner,
@@ -398,6 +387,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
for (Mutation m : mutations.values()) {
TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m);
generatePuts(indexMetaData, indexUpdates, state);
+ generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index 0efb63a..052930d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,13 +58,13 @@ public class LocalTableStateTest {
private static final IndexMetaData indexMetaData = new IndexMetaData() {
@Override
- public boolean isImmutableRows() {
- return false;
+ public ReplayWrite getReplayWrite() {
+ return null;
}
@Override
- public ReplayWrite getReplayWrite() {
- return null;
+ public boolean requiresPriorRowState(Mutation m) {
+ return true;
}
};
@@ -120,14 +121,14 @@ public class LocalTableStateTest {
IndexMetaData indexMetaData = new IndexMetaData() {
@Override
- public boolean isImmutableRows() {
- return false;
- }
-
- @Override
public ReplayWrite getReplayWrite() {
return null;
}
+
+ @Override
+ public boolean requiresPriorRowState(Mutation m) {
+ return true;
+ }
};
Put m = new Put(row);
@@ -157,16 +158,16 @@ public class LocalTableStateTest {
IndexMetaData indexMetaData = new IndexMetaData() {
@Override
- public boolean isImmutableRows() {
- return true;
- }
-
- @Override
public ReplayWrite getReplayWrite() {
return null;
}
+
+ @Override
+ public boolean requiresPriorRowState(Mutation m) {
+ return false;
+ }
- };
+ };
Put m = new Put(row);
m.add(fam, qual, ts, val);
// setup mocks
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc432b97/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index d06967d..d94cce0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -145,7 +145,7 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
Mockito.when(mockRegionInfo.getEndKey()).thenReturn(Bytes.toBytes("z"));
mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
- Mockito.when(mockIndexMetaData.isImmutableRows()).thenReturn(false);
+ Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
Mockito.when(mockIndexMetaData.getIndexMaintainers())
.thenReturn(Collections.singletonList(getTestIndexMaintainer()));