You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/02/21 14:24:50 UTC

[GitHub] [phoenix] virajjasani commented on a change in pull request #1144: PHOENIX-6386 Bulkload generates unverified index rows

virajjasani commented on a change in pull request #1144:
URL: https://github.com/apache/phoenix/pull/1144#discussion_r579813022



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[] emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<Cell> keyValues) {

Review comment:
       nit: `"s/setVerfied/setVerified"`

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;

Review comment:
       nit: good to mark all `final`

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -318,6 +335,27 @@ public void testImportWithIndex() throws Exception {
 
         rs.close();
         stmt.close();
+
+        checkIndexTableIsVerfied("TABLE3_IDX");
+    }
+
+    private void checkIndexTableIsVerfied(String indexTableName) throws SQLException, IOException {

Review comment:
       nit: `"s/checkIndexTableIsVerfied/checkIndexTableIsVerified"`

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -411,8 +421,46 @@ public void errorOnRecord(T record, Throwable throwable) {
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues) {
+        public List<Cell> preUpsert(byte[] tableName, List<Cell> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the Empty cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private byte[] emptyKeyValueCF;
+        private int emptyKeyValueCFLength;
+        private byte[] emptyKeyValueQualifier;
+        private int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(byte[] emptyKeyValueCF, byte[] emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<Cell> keyValues) {
+            for(int i=0; i < keyValues.size() ; i++) {
+                Cell kv = keyValues.get(i);

Review comment:
       can be replaced by
   ```
   for (Cell kv : keyValues) {
   ```

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
##########
@@ -36,16 +39,30 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;

Review comment:
       Few unused imports, nothing urgent, can be removed while committing changes

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<Cell>> kvPair = uncommittedDataIterator.next();
                 List<Cell> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<Cell>());
                         }
-                        List<Cell> list = map.get(i);
+                        List<Cell> cellsForTable = map.get(i);
+                        if(indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
+                        }

Review comment:
       This should solve the problem right?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
##########
@@ -176,17 +179,20 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<Cell>> kvPair = uncommittedDataIterator.next();
                 List<Cell> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<Cell>());
                         }
-                        List<Cell> list = map.get(i);
+                        List<Cell> cellsForTable = map.get(i);
+                        if(indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
+                        }
                         for (Cell kv : keyValueList) {
-                            list.add(kv);
+                            cellsForTable.add(kv);
                         }

Review comment:
       nit: can be replaced with `cellsForTable.addAll(keyValueList)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org