You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/05/12 00:09:20 UTC

[3/4] phoenix git commit: PHOENIX-3825 Mutable Index rebuild does not write an index version for each data row version (Vincent Poon)

PHOENIX-3825 Mutable Index rebuild does not write an index version for each data row version (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4153c82e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4153c82e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4153c82e

Branch: refs/heads/master
Commit: 4153c82e3b6fbc49232b6cdc2a0b98d3af05648e
Parents: 1f1ecc9
Author: James Taylor <ja...@apache.org>
Authored: Thu May 11 17:02:26 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 11 17:07:30 2017 -0700

----------------------------------------------------------------------
 .../hbase/index/covered/IndexMetaData.java      |  9 +++++++-
 .../hbase/index/covered/NonTxIndexBuilder.java  |  2 +-
 .../covered/example/CoveredColumnIndexer.java   |  2 +-
 .../covered/update/IndexUpdateManager.java      | 14 ++++++++++++-
 .../index/covered/TestNonTxIndexBuilder.java    | 22 ++++++++++++++++++--
 .../covered/update/TestIndexUpdateManager.java  | 19 ++++++++++++-----
 6 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index 5420013..04e2523 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -23,7 +23,14 @@ public interface IndexMetaData {
         @Override
         public boolean isImmutableRows() {
             return false;
+        }
+
+        @Override
+        public boolean ignoreNewerMutations() {
+          return false;
         }};
-    
+
     public boolean isImmutableRows();
+
+    public boolean ignoreNewerMutations();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index eb9dc96..e335cdc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -66,7 +66,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
     	// create a state manager, so we can manage each batch
         LocalTableState state = new LocalTableState(env, localTable, mutation);
         // build the index updates for each group
-        IndexUpdateManager manager = new IndexUpdateManager();
+        IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
 
         batchMutationAndAddUpdates(manager, state, mutation, indexMetaData);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index 60698c7..c830e54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -121,7 +121,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder {
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
       Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException {
     // stores all the return values
-    IndexUpdateManager updateMap = new IndexUpdateManager();
+    IndexUpdateManager updateMap = new IndexUpdateManager(indexMetaData);
     // batch the updates by row to make life easier and ordered
     Collection<Batch> batches = batchByRow(filtered);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
index 5f6020a..2784f0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 import com.google.common.collect.Lists;
@@ -99,6 +100,12 @@ public class IndexUpdateManager {
 
   protected final Map<ImmutableBytesPtr, Collection<Mutation>> map =
       new HashMap<ImmutableBytesPtr, Collection<Mutation>>();
+  private IndexMetaData indexMetaData;
+
+  public IndexUpdateManager(IndexMetaData indexMetaData) {
+    this.indexMetaData = indexMetaData;
+
+  }
 
   /**
    * Add an index update. Keeps the latest {@link Put} for a given timestamp
@@ -113,7 +120,12 @@ public class IndexUpdateManager {
       updates = new TreeSet<Mutation>(COMPARATOR);
       map.put(key, updates);
     }
-    fixUpCurrentUpdates(updates, m);
+    if (indexMetaData.ignoreNewerMutations()) {
+      // if we're replaying mutations, we don't need to worry about out-of-order updates
+      updates.add(m);
+    } else {
+      fixUpCurrentUpdates(updates, m);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
index d4d69b4..27e35df 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestNonTxIndexBuilder.java
@@ -98,6 +98,7 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
     private static final byte[] VALUE_1 = Bytes.toBytes(111);
     private static final byte[] VALUE_2 = Bytes.toBytes(222);
     private static final byte[] VALUE_3 = Bytes.toBytes(333);
+    private static final byte[] VALUE_4 = Bytes.toBytes(444);
     private static final byte PUT_TYPE = KeyValue.Type.Put.getCode();
 
     private NonTxIndexBuilder indexBuilder;
@@ -217,6 +218,8 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
     /**
      * Tests a partial rebuild of a row with multiple versions. 3 versions of the row in data table,
      * and we rebuild the index starting from time t=2
+     *
+     * There should be one index row version per data row version.
      */
     @Test
     public void testRebuildMultipleVersionRow() throws IOException {
@@ -229,11 +232,15 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
         Cell currentCell1 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 1, PUT_TYPE, VALUE_1);
         Cell currentCell2 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 2, PUT_TYPE, VALUE_2);
         Cell currentCell3 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 3, PUT_TYPE, VALUE_3);
-        setCurrentRowState(Arrays.asList(currentCell3, currentCell2, currentCell1));
+        Cell currentCell4 = CellUtil.createCell(ROW, FAM, INDEXED_QUALIFIER, 4, PUT_TYPE, VALUE_4);
+        setCurrentRowState(Arrays.asList(currentCell4, currentCell3, currentCell2, currentCell1));
 
         // rebuilder replays mutations starting from t=2
         MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW));
         Put put = new Put(ROW);
+        put.addImmutable(FAM, INDEXED_QUALIFIER, 4, VALUE_4);
+        mutation.addAll(put);
+        put = new Put(ROW);
         put.addImmutable(FAM, INDEXED_QUALIFIER, 3, VALUE_3);
         mutation.addAll(put);
         put = new Put(ROW);
@@ -242,11 +249,22 @@ public class TestNonTxIndexBuilder extends BaseConnectionlessQueryTest {
 
         Collection<Pair<Mutation, byte[]>> indexUpdates =
                 indexBuilder.getIndexUpdate(mutation, mockIndexMetaData);
-        assertEquals(2, indexUpdates.size());
+        // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index
+        // rows for VALUE_2, VALUE_3)
+        assertEquals(6, indexUpdates.size());
+
         assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM,
             new byte[0] /* qual not needed */, 2);
         assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW,
+            KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 2);
+        assertContains(indexUpdates, 3, ROW, KeyValue.Type.DeleteFamily, FAM,
+            new byte[0] /* qual not needed */, 3);
+        assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW,
             KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 3);
+        assertContains(indexUpdates, 4, ROW, KeyValue.Type.DeleteFamily, FAM,
+            new byte[0] /* qual not needed */, 4);
+        assertContains(indexUpdates, ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP, ROW,
+            KeyValue.Type.Put, FAM, QueryConstants.EMPTY_COLUMN_BYTES, 4);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4153c82e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
index a0592f3..9e50615 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Before;
 import org.junit.Test;
-
+import org.mockito.Mockito;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 
 public class TestIndexUpdateManager {
@@ -39,10 +41,17 @@ public class TestIndexUpdateManager {
   private static final byte[] row = Bytes.toBytes("row");
   private static final String TABLE_NAME = "table";
   private static final byte[] table = Bytes.toBytes(TABLE_NAME);
+  private IndexMetaData mockIndexMetaData;
+
+  @Before
+  public void setup() {
+    mockIndexMetaData = Mockito.mock(IndexMetaData.class);
+    Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(false);
+  }
 
   @Test
   public void testMutationComparator() throws Exception {
-    IndexUpdateManager manager = new IndexUpdateManager();
+    IndexUpdateManager manager = new IndexUpdateManager(mockIndexMetaData);
     Comparator<Mutation> comparator = manager.COMPARATOR;
     Put p = new Put(row, 10);
     // lexigraphically earlier should sort earlier
@@ -79,7 +88,7 @@ public class TestIndexUpdateManager {
    */
   @Test
   public void testCancelingUpdates() throws Exception {
-    IndexUpdateManager manager = new IndexUpdateManager();
+    IndexUpdateManager manager = new IndexUpdateManager(mockIndexMetaData);
 
     long ts1 = 10, ts2 = 11;
     // at different timestamps, so both should be retained
@@ -106,13 +115,13 @@ public class TestIndexUpdateManager {
     validate(manager, pending);
 
     // if there is just a put and a delete at the same ts, no pending updates should be returned
-    manager = new IndexUpdateManager();
+    manager = new IndexUpdateManager(mockIndexMetaData);
     manager.addIndexUpdate(table, d2);
     manager.addIndexUpdate(table, p);
     validate(manager, Collections.<Mutation> emptyList());
 
     // different row insertions can be tricky too, if you don't get the base cases right
-    manager = new IndexUpdateManager();
+    manager = new IndexUpdateManager(mockIndexMetaData);
     manager.addIndexUpdate(table, p);
     // this row definitely sorts after the current row
     byte[] row1 = Bytes.toBytes("row1");