You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/03/24 02:50:57 UTC
[phoenix] branch 4.x updated: PHOENIX-5747 Add upsert tests for
immutable table indexes
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 000c6a7 PHOENIX-5747 Add upsert tests for immutable table indexes
000c6a7 is described below
commit 000c6a7f2aebf126c9bf4235a9fa29495948b59d
Author: Tanuj Khurana <kh...@gmail.com>
AuthorDate: Mon Mar 2 16:25:03 2020 -0800
PHOENIX-5747 Add upsert tests for immutable table indexes
Signed-off-by: s.kadam <s....@apache.org>
---
.../end2end/index/ImmutableIndexExtendedIT.java | 290 +++++++++++++++++++++
1 file changed, 290 insertions(+)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
new file mode 100644
index 0000000..49219bf
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class ImmutableIndexExtendedIT extends ParallelStatsDisabledIT {
+
+ private final String tableDDLOptions;
+ private final FailingRegionObserver coproc;
+ private final Boolean useView;
+
+ public ImmutableIndexExtendedIT(FailingRegionObserver coproc, Boolean useView) {
+ this.coproc = coproc;
+ this.useView = useView;
+ StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ private enum FailStep {
+ NONE,
+ PRE_INDEX_TABLE_UPDATE,
+ POST_INDEX_TABLE_UPDATE
+ }
+
+ private boolean getExpectedStatus(FailStep step) {
+ boolean status;
+ switch (step) {
+ case NONE:
+ status = true;
+ break;
+ case PRE_INDEX_TABLE_UPDATE:
+ case POST_INDEX_TABLE_UPDATE:
+ default:
+ status = false;
+ }
+ return status;
+ }
+
+ private int getExpectedUnverifiedRowCount(FailStep step) {
+ int unverifiedRowCount;
+ switch (step) {
+ case POST_INDEX_TABLE_UPDATE:
+ unverifiedRowCount = 1;
+ break;
+ case NONE:
+ case PRE_INDEX_TABLE_UPDATE:
+ default:
+ unverifiedRowCount = 0;
+ }
+ return unverifiedRowCount;
+ }
+
+ interface FailingRegionObserver {
+ FailStep getFailStep();
+ }
+
+ public static class PreMutationFailingRegionObserver extends SimpleRegionObserver
+ implements FailingRegionObserver {
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ throw new IOException();
+ }
+
+ @Override
+ public FailStep getFailStep() {
+ return FailStep.PRE_INDEX_TABLE_UPDATE;
+ }
+ }
+
+ public static class PostMutationFailingRegionObserver extends SimpleRegionObserver
+ implements FailingRegionObserver{
+
+ @Override
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ throw new IOException();
+ }
+
+ @Override
+ public FailStep getFailStep() {
+ return FailStep.POST_INDEX_TABLE_UPDATE;
+ }
+ }
+
+ public static class FailOnceMutationRegionObserver extends SimpleRegionObserver
+ implements FailingRegionObserver{
+
+ private boolean failOnce = true;
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ if (failOnce) {
+ // next attempt don't raise
+ failOnce = false;
+ throw new IOException();
+ }
+ }
+
+ @Override
+ public FailStep getFailStep() { return FailStep.NONE; }
+ }
+
+ @Parameterized.Parameters(name ="coproc = {0}, useView = {1}")
+ public static Collection<Object[]> data() {
+ List<Object[]> params = Lists.newArrayListWithExpectedSize(6);
+ boolean[] Booleans = new boolean[] { false, true };
+ for (boolean useView : Booleans) {
+ params.add(new Object[]{new PreMutationFailingRegionObserver(), useView});
+ params.add(new Object[]{new PostMutationFailingRegionObserver(), useView});
+ params.add(new Object[]{new FailOnceMutationRegionObserver(), useView});
+ }
+ return params;
+ }
+
+ private void createAndPopulateTable(Connection conn, String tableName, int rowCount)
+ throws Exception {
+ String ddl = "CREATE TABLE " + tableName
+ + " (id integer not null primary key, val1 varchar, val2 varchar, val3 varchar)"
+ + tableDDLOptions;
+ conn.createStatement().execute(ddl);
+ String dml = "UPSERT INTO " + tableName + " (id, val1, val2, val3) VALUES (?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+
+ for (int id = 1; id <= rowCount; ++id) {
+ stmt.setInt(1, id);
+ stmt.setString(2, "a" + id);
+ stmt.setString(3, "ab" + id);
+ stmt.setString(4, "abc" + id);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+
+ private void createView(Connection conn, String dataTable, String viewTable)
+ throws Exception {
+ String ddl = "CREATE VIEW " + viewTable + " AS SELECT * FROM " + dataTable;
+ conn.createStatement().execute(ddl);
+ }
+
+ private void createIndex(Connection conn, String dataTable, String indexTable)
+ throws Exception {
+ String ddl = "CREATE INDEX " + indexTable + " on " + dataTable
+ + " (val1) include (val2, val3)";
+ conn.createStatement().execute(ddl);
+ conn.commit();
+ TestUtil.waitForIndexState(conn, indexTable, PIndexState.ACTIVE);
+ }
+
+ private static int getRowCountForEmptyColValue(Connection conn, String tableName,
+ byte[] valueBytes) throws IOException, SQLException {
+
+ PTable table = PhoenixRuntime.getTable(conn, tableName);
+ byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ ConnectionQueryServices queryServices =
+ conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HTable htable = (HTable) queryServices.getTable(table.getPhysicalName().getBytes());
+ Scan scan = new Scan();
+ scan.addColumn(emptyCF, emptyCQ);
+ ResultScanner resultScanner = htable.getScanner(scan);
+ int count = 0;
+
+ for (Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
+ if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, valueBytes.length,
+ valueBytes, 0, valueBytes.length) == 0) {
+ ++count;
+ }
+ }
+ return count;
+ }
+
+ private static void verifyRowCountForEmptyCol(Connection conn, String indexTable,
+ int expectedVerifiedCount, int expectedUnverifiedCount) throws Exception {
+
+ assertEquals(expectedVerifiedCount,
+ getRowCountForEmptyColValue(conn, indexTable, IndexRegionObserver.VERIFIED_BYTES));
+ assertEquals(expectedUnverifiedCount,
+ getRowCountForEmptyColValue(conn, indexTable, IndexRegionObserver.UNVERIFIED_BYTES));
+ }
+
+ @Test
+ public void testFailingUpsertMutations() throws Exception {
+ String dataTable = "TBL_" + generateUniqueName();
+ String indexTable = "IND_" + generateUniqueName();
+ String viewTable = "VIEW_" + generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ final int initialRowCount = 2;
+ createAndPopulateTable(conn, dataTable, initialRowCount);
+ createView(conn, dataTable, viewTable);
+ String baseTable = useView ? viewTable : dataTable;
+ createIndex(conn, baseTable, indexTable);
+ String index = PhoenixRuntime.getTable(conn, indexTable).getPhysicalName().getString();
+ TestUtil.addCoprocessor(conn, index, coproc.getClass());
+ boolean upsertStatus = true;
+ try {
+ String dml = "UPSERT INTO " + baseTable + " VALUES (3, 'a3', 'ab3', 'abc3')";
+ conn.createStatement().execute(dml);
+ conn.commit();
+ } catch (Exception ex) {
+ upsertStatus = false;
+ }
+ boolean expectedStatus = getExpectedStatus(coproc.getFailStep());
+ assertEquals(expectedStatus, upsertStatus);
+
+ String dql = "SELECT * FROM " + baseTable + " WHERE id = 3";
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ if (!upsertStatus) {
+ // verify that the row was not inserted into the data table
+ assertFalse(rs.next());
+ verifyRowCountForEmptyCol(conn, indexTable, initialRowCount,
+ getExpectedUnverifiedRowCount(coproc.getFailStep()));
+ } else {
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ verifyRowCountForEmptyCol(conn, indexTable, initialRowCount + 1,
+ getExpectedUnverifiedRowCount(coproc.getFailStep()));
+ }
+ TestUtil.removeCoprocessor(conn, index, coproc.getClass());
+ }
+ }
+}