You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/02/22 11:15:42 UTC
[phoenix] branch 4.x updated: PHOENIX-6386 Bulkload generates
unverified index rows
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 358ace8 PHOENIX-6386 Bulkload generates unverified index rows
358ace8 is described below
commit 358ace857a378ce84f3552f7a64fff58c74be662
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Feb 18 11:36:31 2021 +0100
PHOENIX-6386 Bulkload generates unverified index rows
---
.../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 39 +++++++++++++-
.../mapreduce/FormatToBytesWritableMapper.java | 61 +++++++++++++++++++---
.../ImportPreUpsertKeyValueProcessor.java | 4 +-
3 files changed, 94 insertions(+), 10 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 2de2c82..762d613 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -21,13 +21,16 @@ import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
@@ -36,16 +39,25 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -318,6 +330,27 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
rs.close();
stmt.close();
+
+ checkIndexTableIsVerified("TABLE3_IDX");
+ }
+
+ private void checkIndexTableIsVerified(String indexTableName) throws SQLException, IOException {
+ ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Table hTable = cqs.getTable(Bytes.toBytes(indexTableName));
+ PTable pTable = PhoenixRuntime.getTable(conn, indexTableName);
+
+ byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(pTable);
+ byte[] emptyKeyValueQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(pTable).getFirst();
+
+ Scan scan = new Scan();
+ scan.setFilter(new SingleColumnValueFilter(
+ emptyKeyValueCF,
+ emptyKeyValueQualifier,
+ CompareOp.NOT_EQUAL,
+ IndexRegionObserver.VERIFIED_BYTES));
+ try (ResultScanner scanner = hTable.getScanner(scan)) {
+ assertNull("There are non VERIFIED rows in index", scanner.next());
+ }
}
@Test
@@ -405,6 +438,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
rs.close();
stmt.close();
+
+ if (!localIndex) {
+ checkIndexTableIsVerified(indexTableName);
+ }
}
@Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 63840e7..ecd6110 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -56,6 +57,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,6 +109,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
protected PhoenixConnection conn;
protected UpsertExecutor<RECORD, ?> upsertExecutor;
protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+ protected IndexStatusUpdater[] indexStatusUpdaters;
protected List<String> tableNames;
protected List<String> logicalNames;
protected MapperUpsertListener<RECORD> upsertListener;
@@ -179,18 +182,19 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
while (uncommittedDataIterator.hasNext()) {
Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
List<KeyValue> keyValueList = kvPair.getSecond();
- keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
- byte[] first = kvPair.getFirst();
+ byte[] tableName = kvPair.getFirst();
+ keyValueList = preUpdateProcessor.preUpsert(tableName, keyValueList);
// Create a list of KV for each table
for (int i = 0; i < tableNames.size(); i++) {
- if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+ if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), tableName) == 0) {
if (!map.containsKey(i)) {
map.put(i, new ArrayList<KeyValue>());
}
- List<KeyValue> list = map.get(i);
- for (KeyValue kv : keyValueList) {
- list.add(kv);
+ List<KeyValue> cellsForTable = map.get(i);
+ if (indexStatusUpdaters[i] != null) {
+ indexStatusUpdaters[i].setVerfied(keyValueList);
}
+ cellsForTable.addAll(keyValueList);
break;
}
}
@@ -213,6 +217,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
*/
private void initColumnIndexes() throws SQLException {
columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ indexStatusUpdaters = new IndexStatusUpdater[logicalNames.size()];
int columnIndex = 0;
for (int index = 0; index < logicalNames.size(); index++) {
PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
@@ -249,6 +254,10 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
columnIndexes.put(cfn, new Integer(columnIndex));
columnIndex++;
+ if (PTable.IndexType.GLOBAL == table.getIndexType()) {
+ indexStatusUpdaters[index] =
+ new IndexStatusUpdater(emptyColumnFamily, emptyKeyValue);
+ }
}
}
@@ -414,8 +423,46 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
ImportPreUpsertKeyValueProcessor {
@Override
- public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+ public List<KeyValue> preUpsert(byte[] tableName, List<KeyValue> keyValues) {
return keyValues;
}
}
+
+ /**
+ * Updates the EMPTY cell value to VERIFIED for global index table rows
+ */
+ private static class IndexStatusUpdater {
+
+ private final byte[] emptyKeyValueCF;
+ private final int emptyKeyValueCFLength;
+ private final byte[] emptyKeyValueQualifier;
+ private final int emptyKeyValueQualifierLength;
+
+ public IndexStatusUpdater(final byte[] emptyKeyValueCF, final byte[] emptyKeyValueQualifier) {
+ this.emptyKeyValueCF = emptyKeyValueCF;
+ this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+ emptyKeyValueCFLength = emptyKeyValueCF.length;
+ emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+ }
+
+ /**
+ * Update the Empty cell values to VERIFIED in the passed keyValues list
+ *
+ * @param keyValues will be modified
+ */
+ public void setVerfied(List<KeyValue> keyValues) {
+ for (int i = 0; i < keyValues.size() ; i++) {
+ Cell kv = keyValues.get(i);
+ if (CellUtil.matchingFamily(kv, emptyKeyValueCF, 0, emptyKeyValueCFLength)
+ && CellUtil.matchingQualifier(kv, emptyKeyValueQualifier, 0, emptyKeyValueQualifierLength)) {
+ if (kv.getValueLength() != 1) {
+ //This should never happen. Fail fast if it does.
+ throw new IllegalArgumentException("Empty cell value length is not 1");
+ }
+ //We are directly overwriting the value for performance
+ kv.getValueArray()[kv.getValueOffset()] = IndexRegionObserver.VERIFIED_BYTES[0];
+ }
+ }
+ }
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
index 6835b81..d6ec3d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
@@ -39,10 +39,10 @@ public interface ImportPreUpsertKeyValueProcessor {
* Implementors can filter certain KeyValues from the list, augment the list, or return the
* same list.
*
- * @param rowKey the row key for the key values that are being passed in
+ * @param tableName the table name for the key values that are being passed in
* @param keyValues list of KeyValues that are to be written to an HFile
* @return the list that will actually be written
*/
- List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues);
+ List<KeyValue> preUpsert(byte[] tableName, List<KeyValue> keyValues);
}