You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/02/22 05:36:54 UTC

[GitHub] [phoenix] stoty commented on a change in pull request #1144: PHOENIX-6386 Bulkload generates unverified index rows

stoty commented on a change in pull request #1144:
URL: https://github.com/apache/phoenix/pull/1144#discussion_r579992339



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -36,16 +39,30 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;

Review comment:
       OK

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;

Review comment:
       OK

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[] emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<Cell> keyValues) {
+            for(int i=0; i < keyValues.size() ; i++) {
+                Cell kv = keyValues.get(i);

Review comment:
       I purposefully chose this construct, as we're processing a lot a objects, and this is reportedly faster and easier on memory / GC.

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<Cell>> kvPair = uncommittedDataIterator.next();
                 List<Cell> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<Cell>());
                         }
-                        List<Cell> list = map.get(i);
+                        List<Cell> cellsForTable = map.get(i);
+                        if(indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
+                        }
                         for (Cell kv : keyValueList) {
-                            list.add(kv);
+                            cellsForTable.add(kv);
                         }

Review comment:
       OK

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -318,6 +335,27 @@ public void testImportWithIndex() throws Exception {
 
         rs.close();
         stmt.close();
+
+        checkIndexTableIsVerfied("TABLE3_IDX");
+    }
+
+    private void checkIndexTableIsVerfied(String indexTableName) throws SQLException, IOException {

Review comment:
       Thanks

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<Cell>> kvPair = uncommittedDataIterator.next();
                 List<Cell> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<Cell>());
                         }
-                        List<Cell> list = map.get(i);
+                        List<Cell> cellsForTable = map.get(i);
+                        if(indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
+                        }

Review comment:
       Strictly speaking, we are not verifying them, as we're generating the records in bulk, and we assume that they are correct (we assume that the bluk load job runs fully).
   We're just setting the status to verified to avoid repairing the freshly generated index rows.

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[] emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<Cell> keyValues) {

Review comment:
       Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org