You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/02/22 11:15:42 UTC

[phoenix] branch 4.x updated: PHOENIX-6386 Bulkload generates unverified index rows

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 358ace8  PHOENIX-6386 Bulkload generates unverified index rows
358ace8 is described below

commit 358ace857a378ce84f3552f7a64fff58c74be662
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Feb 18 11:36:31 2021 +0100

    PHOENIX-6386 Bulkload generates unverified index rows
---
 .../apache/phoenix/end2end/CsvBulkLoadToolIT.java  | 39 +++++++++++++-
 .../mapreduce/FormatToBytesWritableMapper.java     | 61 +++++++++++++++++++---
 .../ImportPreUpsertKeyValueProcessor.java          |  4 +-
 3 files changed, 94 insertions(+), 10 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 2de2c82..762d613 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -21,13 +21,16 @@ import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 
@@ -36,16 +39,25 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -318,6 +330,27 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
 
         rs.close();
         stmt.close();
+
+        checkIndexTableIsVerified("TABLE3_IDX");
+    }
+
+    private void checkIndexTableIsVerified(String indexTableName) throws SQLException, IOException {
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Table hTable = cqs.getTable(Bytes.toBytes(indexTableName));
+        PTable pTable = PhoenixRuntime.getTable(conn, indexTableName);
+
+        byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(pTable);
+        byte[] emptyKeyValueQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(pTable).getFirst();
+
+        Scan scan = new Scan();
+        scan.setFilter(new SingleColumnValueFilter(
+                emptyKeyValueCF,
+                emptyKeyValueQualifier,
+                CompareOp.NOT_EQUAL,
+                IndexRegionObserver.VERIFIED_BYTES));
+        try (ResultScanner scanner = hTable.getScanner(scan)) {
+            assertNull("There are non VERIFIED rows in index", scanner.next());
+        }
     }
 
     @Test
@@ -405,6 +438,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
 
         rs.close();
         stmt.close();
+
+        if (!localIndex) {
+            checkIndexTableIsVerified(indexTableName);
+        }
     }
     
     @Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 63840e7..ecd6110 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -56,6 +57,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,6 +109,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     protected PhoenixConnection conn;
     protected UpsertExecutor<RECORD, ?> upsertExecutor;
     protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+    protected IndexStatusUpdater[] indexStatusUpdaters;
     protected List<String> tableNames;
     protected List<String> logicalNames;
     protected MapperUpsertListener<RECORD> upsertListener;
@@ -179,18 +182,19 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
                 List<KeyValue> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<KeyValue>());
                         }
-                        List<KeyValue> list = map.get(i);
-                        for (KeyValue kv : keyValueList) {
-                            list.add(kv);
+                        List<KeyValue> cellsForTable = map.get(i);
+                        if (indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
                         }
+                        cellsForTable.addAll(keyValueList);
                         break;
                     }
                 }
@@ -213,6 +217,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
      */
     private void initColumnIndexes() throws SQLException {
         columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+        indexStatusUpdaters = new IndexStatusUpdater[logicalNames.size()];
         int columnIndex = 0;
         for (int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
@@ -249,6 +254,10 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
             byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
             columnIndexes.put(cfn, new Integer(columnIndex));
             columnIndex++;
+            if (PTable.IndexType.GLOBAL == table.getIndexType()) {
+                indexStatusUpdaters[index] =
+                        new IndexStatusUpdater(emptyColumnFamily, emptyKeyValue);
+            }
         }
     }
 
@@ -414,8 +423,46 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+        public List<KeyValue> preUpsert(byte[] tableName, List<KeyValue> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the EMPTY cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private final byte[] emptyKeyValueCF;
+        private final int emptyKeyValueCFLength;
+        private final byte[] emptyKeyValueQualifier;
+        private final int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(final byte[] emptyKeyValueCF, final byte[] emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<KeyValue> keyValues) {
+            for (int i = 0; i < keyValues.size() ; i++) {
+                Cell kv = keyValues.get(i);
+                if (CellUtil.matchingFamily(kv, emptyKeyValueCF, 0, emptyKeyValueCFLength)
+                        && CellUtil.matchingQualifier(kv, emptyKeyValueQualifier, 0, emptyKeyValueQualifierLength)) {
+                    if (kv.getValueLength() != 1) {
+                        //This should never happen. Fail fast if it does.
+                       throw new IllegalArgumentException("Empty cell value length is not 1");
+                    }
+                    //We are directly overwriting the value for performance
+                    kv.getValueArray()[kv.getValueOffset()] = IndexRegionObserver.VERIFIED_BYTES[0];
+                }
+            }
+        }
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
index 6835b81..d6ec3d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
@@ -39,10 +39,10 @@ public interface ImportPreUpsertKeyValueProcessor {
      * Implementors can filter certain KeyValues from the list, augment the list, or return the
      * same list.
      *
-     * @param rowKey the row key for the key values that are being passed in
+     * @param tableName the table name for the key values that are being passed in
      * @param keyValues list of KeyValues that are to be written to an HFile
      * @return the list that will actually be written
      */
-    List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues);
+    List<KeyValue> preUpsert(byte[] tableName, List<KeyValue> keyValues);
 
 }