You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/05/12 00:09:20 UTC
[3/4] phoenix git commit: PHOENIX-3825 Mutable Index rebuild does not
write an index version for each data row version (Vincent Poon)
PHOENIX-3825 Mutable Index rebuild does not write an index version for each data row version (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4153c82e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4153c82e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4153c82e
Branch: refs/heads/master
Commit: 4153c82e3b6fbc49232b6cdc2a0b98d3af05648e
Parents: 1f1ecc9
Author: James Taylor <ja...@apache.org>
Authored: Thu May 11 17:02:26 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 11 17:07:30 2017 -0700
----------------------------------------------------------------------
.../hbase/index/covered/IndexMetaData.java | 9 +++++++-
.../hbase/index/covered/NonTxIndexBuilder.java | 2 +-
.../covered/example/CoveredColumnIndexer.java | 2 +-
.../covered/update/IndexUpdateManager.java | 14 ++++++++++++-
.../index/covered/TestNonTxIndexBuilder.java | 22 ++++++++++++++++++--
.../covered/update/TestIndexUpdateManager.java | 19 ++++++++++++-----
6 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index 5420013..04e2523 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -23,7 +23,14 @@ public interface IndexMetaData {
@Override
public boolean isImmutableRows() {
return false;
+ }
+
+ @Override
+ public boolean ignoreNewerMutations() {
+ return false;
}};
-
+
public boolean isImmutableRows();
+
+ public boolean ignoreNewerMutations();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index eb9dc96..e335cdc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -66,7 +66,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
// create a state manager, so we can manage each batch
LocalTableState state = new LocalTableState(env, localTable, mutation);
// build the index updates for each group
- IndexUpdateManager manager = new IndexUpdateManager();
+ IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
batchMutationAndAddUpdates(manager, state, mutation, indexMetaData);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index 60698c7..c830e54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -121,7 +121,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder {
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
// stores all the return values
- IndexUpdateManager updateMap = new IndexUpdateManager();
+ IndexUpdateManager updateMap = new IndexUpdateManager(indexMetaData);
// batch the updates by row to make life easier and ordered
Collection<Batch> batches = batchByRow(filtered);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
index 5f6020a..2784f0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import com.google.common.collect.Lists;
@@ -99,6 +100,12 @@ public class IndexUpdateManager {
protected final Map<ImmutableBytesPtr, Collection<Mutation>> map =
new HashMap<ImmutableBytesPtr, Collection<Mutation>>();
+ private IndexMetaData indexMetaData;
+
+ public IndexUpdateManager(IndexMetaData indexMetaData) {
+ this.indexMetaData = indexMetaData;
+
+ }
/**
* Add an index update. Keeps the latest {@link Put} for a given timestamp
@@ -113,7 +120,12 @@ public class IndexUpdateManager {
updates = new TreeSet<Mutation>(COMPARATOR);
map.put(key, updates);
}
- fixUpCurrentUpdates(updates, m);
+ if (indexMetaData.ignoreNewerMutations()) {
+ // if we're replaying mutations, we don't need to worry about out-of-order updates
+ updates.add(m);
+ } else {
+ fixUpCurrentUpdates(updates, m);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
index d4d69b4..27e35df 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
@@ -98,6 +98,7 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
private static final byte[] VALUE_1 = Bytes.toBytes(111);
private static final byte[] VALUE_2 = Bytes.toBytes(222);
private static final byte[] VALUE_3 = Bytes.toBytes(333);
+ private static final byte[] VALUE_4 = Bytes.toBytes(444);
private static final byte PUT_TYPE = KeyValue.Type.Put.getCode();
private NonTxIndexBuilder indexBuilder;
@@ -217,6 +218,8 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
/**
* Tests a partial rebuild of a row with multiple versions. 3 versions of the row in data table,
* and we rebuild the index starting from time t=2
+ *
+ * There should be one index row version per data row version.
*/
@Test
public void testRebuildMultipleVersionRow() throws IOException {
@@ -229,11 +232,15 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
Cell currentCell1 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 1, PUT_TYPE, VALUE_1);
Cell currentCell2 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 2, PUT_TYPE, VALUE_2);
Cell currentCell3 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 3, PUT_TYPE, VALUE_3);
- setCurrentRowState(Arrays.asList(currentCell3, currentCell2, currentCell1));
+ Cell currentCell4 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 4, PUT_TYPE, VALUE_4);
+ setCurrentRowState(Arrays.asList(currentCell4, currentCell3, currentCell2, currentCell1));
// rebuilder replays mutations starting from t=2
MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW));
Put put = new Put(ROW);
+ put.addImmutable(FAM, INDEXED_QUALIFIER, 4, VALUE_4);
+ mutation.addAll(put);
+ put = new Put(ROW);
put.addImmutable(FAM, INDEXED_QUALIFIER, 3, VALUE_3);
mutation.addAll(put);
put = new Put(ROW);
@@ -242,11 +249,22 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
Collection<Pair<Mutation, byte[]>> indexUpdates =
indexBuilder.getIndexUpdate(mutation, mockIndexMetaData);
- assertEquals(2, indexUpdates.size());
+ // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index
+ // rows for VALUE_2, VALUE_3)
+ assertEquals(6, indexUpdates.size());
+
assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM,
new byte[0] /* qual not needed */, 2);
assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW,
+ KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 2);
+ assertContains(indexUpdates, 3, ROW, KeyValue.Type.DeleteFamily, FAM,
+ new byte[0] /* qual not needed */, 3);
+ assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW,
KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 3);
+ assertContains(indexUpdates, 4, ROW, KeyValue.Type.DeleteFamily, FAM,
+ new byte[0] /* qual not needed */, 4);
+ assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW,
+ KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 4);
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
index a0592f3..9e50615 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Before;
import org.junit.Test;
-
+import org.mockito.Mockito;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
public class TestIndexUpdateManager {
@@ -39,10 +41,17 @@ public class TestIndexUpdateManager {
private static final byte[] row = Bytes.toBytes("row");
private static final String TABLE_NAME = "table";
private static final byte[] table = Bytes.toBytes(TABLE_NAME);
+ private IndexMetaData mockIndexMetaData;
+
+ @Before
+ public void setup() {
+ mockIndexMetaData = Mockito.mock(IndexMetaData.class);
+ Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(false);
+ }
@Test
public void testMutationComparator() throws Exception {
- IndexUpdateManager manager = new IndexUpdateManager();
+ IndexUpdateManager manager = new IndexUpdateManager(mockIndexMetaData);
Comparator<Mutation> comparator = manager.COMPARATOR;
Put p = new Put(row, 10);
// lexigraphically earlier should sort earlier
@@ -79,7 +88,7 @@ public class TestIndexUpdateManager {
*/
@Test
public void testCancelingUpdates() throws Exception {
- IndexUpdateManager manager = new IndexUpdateManager();
+ IndexUpdateManager manager = new IndexUpdateManager(mockIndexMetaData);
long ts1 = 10, ts2 = 11;
// at different timestamps, so both should be retained
@@ -106,13 +115,13 @@ public class TestIndexUpdateManager {
validate(manager, pending);
// if there is just a put and a delete at the same ts, no pending updates should be returned
- manager = new IndexUpdateManager();
+ manager = new IndexUpdateManager(mockIndexMetaData);
manager.addIndexUpdate(table, d2);
manager.addIndexUpdate(table, p);
validate(manager, Collections.<Mutation> emptyList());
// different row insertions can be tricky too, if you don't get the base cases right
- manager = new IndexUpdateManager();
+ manager = new IndexUpdateManager(mockIndexMetaData);
manager.addIndexUpdate(table, p);
// this row definitely sorts after the current row
byte[] row1 = Bytes.toBytes("row1");