You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/06/12 05:36:45 UTC
[phoenix] branch 4.14-HBase-1.4 updated: PHOENIX-5156 Consistent
Mutable Global Indexes for Non-Transactional Tables
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
new b9a22b9 PHOENIX-5156 Consistent Mutable Global Indexes for Non-Transactional Tables
b9a22b9 is described below
commit b9a22b91d14e59a289f242b5efb507ddc29963b2
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Jun 6 23:10:47 2019 -0700
PHOENIX-5156 Consistent Mutable Global Indexes for Non-Transactional Tables
---
...WALReplayWithIndexWritesAndCompressedWALIT.java | 2 +
.../end2end/ExplainPlanWithStatsEnabledIT.java | 19 +-
.../phoenix/end2end/IndexBuildTimestampIT.java | 2 +-
.../phoenix/end2end/ParallelStatsDisabledIT.java | 2 +-
.../end2end/index/GlobalIndexCheckerIT.java | 253 ++++++
.../end2end/index/GlobalMutableNonTxIndexIT.java | 14 +-
...MutableNonTxIndexWithLazyPostBatchWriteIT.java} | 29 +-
.../end2end/index/MutableIndexFailureIT.java | 1 +
.../index/MutableIndexFailureWithNamespaceIT.java | 3 +-
.../end2end/index/PartialIndexRebuilderIT.java | 3 +-
.../org/apache/phoenix/rpc/PhoenixServerRpcIT.java | 3 +-
.../org/apache/phoenix/util/IndexScrutinyIT.java | 6 +-
.../coprocessor/BaseScannerRegionObserver.java | 5 +
.../UngroupedAggregateRegionObserver.java | 4 +
.../org/apache/phoenix/execute/BaseQueryPlan.java | 4 +-
.../phoenix/filter/ColumnProjectionFilter.java | 10 +
.../EncodedQualifiersColumnProjectionFilter.java | 4 +
.../MultiEncodedCQKeyValueComparisonFilter.java | 6 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 882 +++++++++++++++++++++
.../apache/phoenix/hbase/index/LockManager.java | 20 +-
.../hbase/index/builder/BaseIndexBuilder.java | 4 +-
.../hbase/index/builder/BaseIndexCodec.java | 3 +-
.../hbase/index/builder/IndexBuildManager.java | 32 +-
.../phoenix/hbase/index/builder/IndexBuilder.java | 4 +-
.../phoenix/hbase/index/covered/IndexCodec.java | 2 +-
...a => AbstractParallelWriterIndexCommitter.java} | 110 ++-
.../phoenix/hbase/index/write/IndexCommitter.java | 2 +-
.../phoenix/hbase/index/write/IndexWriter.java | 77 +-
.../LazyParallelWriterIndexCommitter.java} | 34 +-
.../index/write/ParallelWriterIndexCommitter.java | 200 +----
.../TrackingParallelWriterIndexCommitter.java | 14 +-
.../apache/phoenix/index/GlobalIndexChecker.java | 378 +++++++++
.../org/apache/phoenix/index/IndexMaintainer.java | 11 +
.../apache/phoenix/index/PhoenixIndexBuilder.java | 2 +-
.../apache/phoenix/index/PhoenixIndexCodec.java | 2 +-
.../phoenix/iterate/TableResultIterator.java | 2 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 40 +-
.../org/apache/phoenix/query/QueryConstants.java | 3 +
.../org/apache/phoenix/query/QueryServices.java | 7 +
.../apache/phoenix/query/QueryServicesOptions.java | 4 +
.../java/org/apache/phoenix/util/IndexUtil.java | 94 ++-
.../java/org/apache/phoenix/query/BaseTest.java | 2 +
42 files changed, 1954 insertions(+), 345 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
index 49933b2..1b8639d 100644
--- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
+++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
@@ -64,6 +64,7 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ConfigUtil;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -81,6 +82,7 @@ import org.mockito.Mockito;
* This test should only have a single test - otherwise we will start/stop the minicluster multiple
* times, which is probably not what you want to do (mostly because its so much effort).
*/
+@Ignore
@Category(NeedsOwnMiniClusterTest.class)
public class WALReplayWithIndexWritesAndCompressedWALIT {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index abaa2f6..7326d37 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -160,10 +160,13 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
String sql = "SELECT * FROM " + tableA + " where (c1.a > c2.b) limit 1";
List<Object> binds = Lists.newArrayList();
try (Connection conn = DriverManager.getConnection(getUrl())) {
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ assertFalse(rs.next());
Estimate info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 691L, info.estimatedBytes);
+ assertEquals((Long) 390L, info.estimatedBytes);
assertEquals((Long) 10L, info.estimatedRows);
assertTrue(info.estimateInfoTs > 0);
+
}
}
@@ -186,8 +189,20 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
List<Object> binds = Lists.newArrayList();
binds.add(0);
try (Connection conn = DriverManager.getConnection(getUrl())) {
+ try (PreparedStatement statement = conn.prepareStatement(sql)) {
+ int paramIdx = 1;
+ for (Object bind : binds) {
+ statement.setObject(paramIdx++, bind);
+ }
+ ResultSet rs = statement.executeQuery(sql);
+ assertTrue(rs.next());
+ assertEquals(100, rs.getInt(1));
+ assertEquals(1, rs.getInt(2));
+ assertEquals(3, rs.getInt(3));
+ assertTrue(rs.next());
+ }
Estimate info = getByteRowEstimates(conn, sql, binds);
- assertEquals((Long) 691L, info.estimatedBytes);
+ assertEquals((Long) 390L, info.estimatedBytes);
assertEquals((Long) 10L, info.estimatedRows);
assertTrue(info.estimateInfoTs > 0);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
index 7efba07..8018567 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
@@ -76,7 +76,7 @@ public class IndexBuildTimestampIT extends BaseUniqueNamesOwnClusterIT {
@Parameters(
name = "mutable={0},localIndex={1},async={2},view={3}")
public static Collection<Object[]> data() {
- List<Object[]> list = Lists.newArrayListWithExpectedSize(8);
+ List<Object[]> list = Lists.newArrayListWithExpectedSize(16);
boolean[] Booleans = new boolean[]{false, true};
for (boolean mutable : Booleans) {
for (boolean localIndex : Booleans) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
index 98939da..fb980a3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
@@ -33,7 +33,7 @@ import org.junit.experimental.categories.Category;
public abstract class ParallelStatsDisabledIT extends BaseTest {
@BeforeClass
- public static final void doSetup() throws Exception {
+ public static void doSetup() throws Exception {
setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
new file mode 100644
index 0000000..6a28e71
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
+ private final boolean async;
+ private final String tableDDLOptions;
+
+ public GlobalIndexCheckerIT(boolean async, boolean encoded) {
+ this.async = async;
+ StringBuilder optionBuilder = new StringBuilder();
+ if (!encoded) {
+ optionBuilder.append(" COLUMN_ENCODED_BYTES=0 ");
+ }
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Parameters(
+ name = "async={0},encoded={1}")
+ public static Collection<Object[]> data() {
+ List<Object[]> list = Lists.newArrayListWithExpectedSize(4);
+ boolean[] Booleans = new boolean[]{true, false};
+ for (boolean async : Booleans) {
+ for (boolean encoded : Booleans) {
+ list.add(new Object[]{async, encoded});
+ }
+ }
+ return list;
+ }
+
+ public static void assertExplainPlan(Connection conn, String selectSql,
+ String dataTableFullName, String indexTableFullName) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName);
+ }
+
+ private void populateTable(String tableName) throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("create table " + tableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+ conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+ conn.commit();
+ conn.close();
+ }
+
+ @Test
+ public void testSkipPostIndexDeleteUpdate() throws Exception {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName);
+ Connection conn = DriverManager.getConnection(getUrl());
+ String indexName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexName + " on " +
+ dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName);
+ }
+ String selectSql = "SELECT id from " + dataTableName + " WHERE val1 = 'ab'";
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertFalse(rs.next());
+
+ // Configure Indexer to skip the last write phase (i.e., the post index update phase) where the verify flag is set
+ // to true and/or index rows are deleted and check that this does not impact the correctness
+ IndexRegionObserver.setSkipPostIndexUpdatesForTesting(true);
+ String dml = "DELETE from " + dataTableName + " WHERE id = 'a'";
+ assertEquals(1, conn.createStatement().executeUpdate(dml));
+ conn.commit();
+
+ // The index rows are actually not deleted yet because Indexer skipped delete operation. However, they are
+ // made unverified in the pre index update phase (i.e., the first write phase)
+ dml = "DELETE from " + dataTableName + " WHERE val1 = 'ab'";
+ // This DML will scan the Index table and detect unverified index rows. This will trigger read repair which
+ // result in deleting these rows since the corresponding data table rows are deleted already. So, the number of
+ // rows to be deleted by the "DELETE" DML will be zero since the rows deleted by read repair will not be visible
+ // to the DML
+ assertEquals(0,conn.createStatement().executeUpdate(dml));
+
+ // Count the number of index rows
+ String query = "SELECT COUNT(*) from " + indexName;
+ // There should be one row in the index table
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ conn.close();
+ }
+
+ @Test
+ public void testPartialRowUpdate() throws Exception {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName);
+ Connection conn = DriverManager.getConnection(getUrl());
+ String indexName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexName + " on " +
+ dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName);
+ }
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('c', 'cde')");
+ conn.commit();
+ String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexName);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("ab", rs.getString(2));
+ assertEquals("abcc", rs.getString(3));
+ assertEquals("abcd", rs.getString(4));
+ assertFalse(rs.next());
+
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab', 'abcdd')");
+ conn.commit();
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("ab", rs.getString(2));
+ assertEquals("abcc", rs.getString(3));
+ assertEquals("abcdd", rs.getString(4));
+ assertFalse(rs.next());
+ conn.close();
+ }
+
+ @Test
+ public void testSkipPostIndexPartialRowUpdate() throws Exception {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName);
+ Connection conn = DriverManager.getConnection(getUrl());
+ String indexName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexName + " on " +
+ dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName);
+ }
+ // Configure Indexer to skip the last write phase (i.e., the post index update phase) where the verify flag is set
+ // to true and/or index rows are deleted and check that this does not impact the correctness
+ IndexRegionObserver.setSkipPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('c', 'cd','cde')");
+ conn.commit();
+ String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1 = 'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexName);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("abcc", rs.getString(1));
+ assertEquals("abcd", rs.getString(2));
+ assertFalse(rs.next());
+ conn.close();
+ }
+
+ @Test
+ public void testSkipDataTableAndPostIndexPartialRowUpdate() throws Exception {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName);
+ Connection conn = DriverManager.getConnection(getUrl());
+ String indexName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " +
+ dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+ conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " +
+ dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1");
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2");
+ }
+ // Configure Indexer to skip the last two write phase (i.e., the data table update and post index update phase)
+ // and check that this does not impact the correctness
+ IndexRegionObserver.setSkipDataTableUpdatesForTesting(true);
+ IndexRegionObserver.setSkipPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+ conn.commit();
+ IndexRegionObserver.setSkipDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setSkipPostIndexUpdatesForTesting(false);
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val3) values ('a', 'abcdd')");
+ conn.commit();
+ String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1 = 'ab'";
+ // Verify that we will read from the first index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexName + "1");
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString(1));
+ assertEquals("abcdd", rs.getString(2));
+ assertFalse(rs.next());
+ selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val2 = 'abc'";
+ // Verify that we will read from the second index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexName + "2");
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString(1));
+ assertEquals("abcdd", rs.getString(2));
+ assertFalse(rs.next());
+ conn.close();
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
index f37182d..6704b5b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
@@ -20,19 +20,23 @@ package org.apache.phoenix.end2end.index;
import java.util.Arrays;
import java.util.Collection;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.junit.runners.Parameterized.Parameters;
public class GlobalMutableNonTxIndexIT extends BaseIndexIT {
- public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, boolean transactional, boolean columnEncoded) {
+ public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, boolean transactional, boolean columnEncoded, boolean skipPostIndexUpdates) {
super(localIndex, mutable, transactional, columnEncoded);
+ IndexRegionObserver.setSkipPostIndexUpdatesForTesting(skipPostIndexUpdates);
}
- @Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactional={2},columnEncoded={3}") // name is used by failsafe as file name in reports
+ @Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactional={2},columnEncoded={3},skipPostIndexUpdates={4}") // name is used by failsafe as file name in reports
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] {
- { false, true, false, false }, { false, true, false, true }
- });
+ {false, true, false, false, false},
+ {false, true, false, false, true},
+ {false, true, false, true, false},
+ {false, true, false, true, true}
+ });
}
-
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexWithLazyPostBatchWriteIT.java
similarity index 53%
copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
copy to phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexWithLazyPostBatchWriteIT.java
index 98939da..1422684 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexWithLazyPostBatchWriteIT.java
@@ -15,30 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.phoenix.end2end.index;
-package org.apache.phoenix.end2end;
+import java.util.Map;
-import org.apache.phoenix.query.BaseTest;
+import com.google.common.collect.Maps;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
-/**
- * Base class for tests whose methods run in parallel with statistics disabled.
- * You must create unique names using {@link #generateUniqueName()} for each
- * table and sequence used to prevent collisions.
- */
-@Category(ParallelStatsDisabledTest.class)
-public abstract class ParallelStatsDisabledIT extends BaseTest {
+public class GlobalMutableNonTxIndexWithLazyPostBatchWriteIT extends GlobalMutableNonTxIndexIT {
- @BeforeClass
- public static final void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+ public GlobalMutableNonTxIndexWithLazyPostBatchWriteIT(boolean localIndex, boolean mutable, boolean transactional, boolean columnEncoded, boolean skipPostIndexUpdates) {
+ super(localIndex, mutable, transactional, columnEncoded, skipPostIndexUpdates);
}
- @AfterClass
- public static void tearDownMiniCluster() throws Exception {
- BaseTest.tearDownMiniClusterIfBeyondThreshold();
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+ props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 8f88513..9a103af 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -136,6 +136,7 @@ public class MutableIndexFailureIT extends BaseTest {
Map<String, String> serverProps = getServerProps();
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
indexRebuildTaskRegionEnvironment =
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
index 5ed9e1f..0633888 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
@@ -49,9 +49,10 @@ public class MutableIndexFailureWithNamespaceIT extends MutableIndexFailureIT {
public static void doSetup() throws Exception {
Map<String, String> serverProps = getServerProps();
serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
- Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(3);
clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
TableName systemTable = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index cda282b..f5ef58d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -96,8 +96,9 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
- Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
indexRebuildTaskRegionEnvironment =
(RegionCoprocessorEnvironment) getUtility()
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index ab05c16..c49a822 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -99,7 +99,8 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
ensureTablesOnDifferentRegionServers(dataTableFullName, indexTableFullName);
TestPhoenixIndexRpcSchedulerFactory.reset();
upsertRow(conn, dataTableFullName);
- Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor())
+ // An index row is updated twice, once before the data table row, and once after it. Thus, the factory is invoked twice
+ Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.times(2))
.dispatch(Mockito.any(CallRunner.class));
TestPhoenixIndexRpcSchedulerFactory.reset();
// run select query that should use the index
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index 3277e32..e7e27a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -48,7 +48,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
fail();
} catch (AssertionError e) {
- assertEquals(e.getMessage(),"Expected data table row count to match expected:<2> but was:<1>");
+ assertEquals("Expected data table row count to match expected:<2> but was:<1>", e.getMessage());
}
}
}
@@ -72,7 +72,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
fail();
} catch (AssertionError e) {
- assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')");
+ assertEquals("Expected to find PK in data table: ('x')", e.getMessage());
}
}
}
@@ -97,7 +97,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
fail();
} catch (AssertionError e) {
- assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'");
+ assertEquals("Expected equality for V2, but '2'!='1'", e.getMessage());
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b02d5b4..24374b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -108,6 +108,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme";
public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList";
public static final String CLIENT_VERSION = "_ClientVersion";
+ public static final String CHECK_VERIFY_COLUMN = "_CheckVerifyColumn";
+ public static final String PHYSICAL_DATA_TABLE_NAME = "_PhysicalDataTableName";
+ public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName";
+ public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
+ public static final String SCAN_LIMIT = "_ScanLimit";
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c10dd07..161f9ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1045,6 +1045,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ byte[] scanLimitBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_LIMIT);
+ int scanLimit = (scanLimitBytes != null) ? Bytes.toInt(scanLimitBytes) : 0;
boolean hasMore;
int rowCount = 0;
try {
@@ -1099,6 +1101,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
mutations.clear();
}
rowCount++;
+ if (rowCount == scanLimit)
+ break;
}
} while (hasMore);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 25980b9..bee9bde 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -390,7 +390,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
}
}
- private void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) {
+ public static void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) {
int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0);
int nViewConstants = 0;
if (dataTable.getType() == PTableType.VIEW) {
@@ -420,7 +420,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
}
}
- private void serializeViewConstantsIntoScan(byte[][] viewConstants, Scan scan) {
+ private static void serializeViewConstantsIntoScan(byte[][] viewConstants, Scan scan) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
DataOutputStream output = new DataOutputStream(stream);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index 3d6843d..6e84b5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -189,4 +189,14 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
public ReturnCode filterKeyValue(Cell ignored) throws IOException {
return ReturnCode.INCLUDE_AND_NEXT_COL;
}
+
+ public void addTrackedColumn(ImmutableBytesPtr cf, ImmutableBytesPtr cq) {
+ NavigableSet<ImmutableBytesPtr> columns = columnsTracker.get(cf);
+
+ if (columns == null) {
+ columns = new TreeSet<>();
+ columnsTracker.put(cf, columns);
+ }
+ columns.add(cq);
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java
index cfacb4f..a920471 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java
@@ -144,6 +144,10 @@ public class EncodedQualifiersColumnProjectionFilter extends FilterBase implemen
public ReturnCode filterKeyValue(Cell ignored) throws IOException {
return ReturnCode.INCLUDE_AND_NEXT_COL;
}
+
+ public void addTrackedColumn(int qualifier) {
+ trackedColumns.set(qualifier);
+ }
interface ColumnTracker {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java
index 4e26515..b14a957 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java
@@ -375,7 +375,11 @@ public class MultiEncodedCQKeyValueComparisonFilter extends BooleanExpressionFil
this.minQualifier = minMaxQualifiers.getFirst();
this.maxQualifier = minMaxQualifiers.getSecond();
}
-
+
+ public void setMinQualifier(int minQualifier) {
+ this.minQualifier = minQualifier;
+ }
+
public static MultiEncodedCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException {
try {
return (MultiEncodedCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new MultiEncodedCQKeyValueComparisonFilter());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
new file mode 100644
index 0000000..520aff3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
+import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.LockManager.RowLock;
+import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
+import org.apache.phoenix.hbase.index.builder.IndexBuilder;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
+import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
+import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.trace.TracingUtils;
+import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
+ * to an {@link IndexBuilder} to determine the actual updates to make.
+ * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and
+ * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because
+ * Phoenix always does batch mutations.
+ * <p>
+ */
+public class IndexRegionObserver extends BaseRegionObserver {
+
+ private static final Log LOG = LogFactory.getLog(IndexRegionObserver.class);
+ private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
+ private static final OperationStatus NOWRITE = new OperationStatus(OperationStatusCode.SUCCESS);
+ protected static final byte VERIFIED_BYTE = 1;
+ protected static final byte UNVERIFIED_BYTE = 2;
+ public static final byte[] UNVERIFIED_BYTES = new byte[] { UNVERIFIED_BYTE };
+ public static final byte[] VERIFIED_BYTES = new byte[] { VERIFIED_BYTE };
+
+ /**
+ * Class to represent pending data table rows
+ */
+ private static class PendingRow {
+ private long latestTimestamp;
+ private long count;
+
+ PendingRow(long latestTimestamp) {
+ count = 1;
+ this.latestTimestamp = latestTimestamp;
+ }
+
+ public void add(long timestamp) {
+ count++;
+ if (latestTimestamp < timestamp) {
+ latestTimestamp = timestamp;
+ }
+ }
+
+ public void remove() {
+ count--;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public long getLatestTimestamp() {
+ return latestTimestamp;
+ }
+ }
+
+ private static boolean skipPostIndexUpdatesForTesting = false;
+ private static boolean skipDataTableUpdatesForTesting = false;
+
+ public static void setSkipPostIndexUpdatesForTesting(boolean skip) {
+ skipPostIndexUpdatesForTesting = skip;
+ }
+
+ public static void setSkipDataTableUpdatesForTesting(boolean skip) {
+ skipDataTableUpdatesForTesting = skip;
+ }
+
+ // Hack to get around not being able to save any state between
+ // coprocessor calls. TODO: remove after HBASE-18127 when available
+ private static class BatchMutateContext {
+ private final int clientVersion;
+ // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
+ // the verified column) will have the value false ("unverified") on these mutations
+ private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
+ // The collection of index mutations that will be applied after the data table mutations. The empty column (i.e.,
+ // the verified column) will have the value true ("verified") on the put mutations
+ private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+ // The collection of candidate index mutations that will be applied after the data table mutations
+ private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+ private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ // The set of row keys for the data table rows of this batch such that for each of these rows there exists another
+ // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
+ // row (i.e., concurrent updates).
+ private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+
+ private BatchMutateContext(int clientVersion) {
+ this.clientVersion = clientVersion;
+ }
+ }
+
+ private ThreadLocal<BatchMutateContext> batchMutateContext =
+ new ThreadLocal<BatchMutateContext>();
+
+ /** Configuration key for the {@link IndexBuilder} to use */
+ public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
+
+ /**
+ * Configuration key for if the indexer should check the version of HBase is running. Generally,
+ * you only want to ignore this for testing or for custom versions of HBase.
+ */
+ public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
+
+ private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
+
+ public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write";
+ private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
+
+ private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
+ private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
+ private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
+ private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
+ private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
+ private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
+ private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
+ private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
+
+ // Index writers get invoked before and after data table updates
+ protected IndexWriter preWriter;
+ protected IndexWriter postWriter;
+
+ protected IndexBuildManager builder;
+ private LockManager lockManager;
+
+ // The collection of pending data table rows
+ private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>();
+
+ /**
+ * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
+ * more robust in the face of recoverying index regions that were on the same server as the
+ * primary table region
+ */
+ private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
+
+ private MetricsIndexerSource metricSource;
+
+ private boolean stopped;
+ private boolean disabled;
+ private long slowIndexWriteThreshold;
+ private long slowIndexPrepareThreshold;
+ private long slowPostOpenThreshold;
+ private long slowPreIncrementThreshold;
+ private int rowLockWaitDuration;
+
+ private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ try {
+ final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+ String serverName = env.getRegionServerServices().getServerName().getServerName();
+ if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
+ // make sure the right version <-> combinations are allowed.
+ String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
+ if (errormsg != null) {
+ IOException ioe = new IOException(errormsg);
+ env.getRegionServerServices().abort(errormsg, ioe);
+ throw ioe;
+ }
+ }
+ this.builder = new IndexBuildManager(env);
+ // Clone the config since it is shared
+ DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION);
+ // setup the actual index writer
+ // setup the actual index preWriter
+ this.preWriter = new IndexWriter(indexWriterEnv, serverName + "-index-preWriter", false);
+ if (env.getConfiguration().getBoolean(INDEX_LAZY_POST_BATCH_WRITE, INDEX_LAZY_POST_BATCH_WRITE_DEFAULT)) {
+ this.postWriter = new IndexWriter(indexWriterEnv, new LazyParallelWriterIndexCommitter(), serverName + "-index-postWriter", false);
+ }
+ else {
+ this.postWriter = this.preWriter;
+ }
+
+ this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
+ DEFAULT_ROWLOCK_WAIT_DURATION);
+ this.lockManager = new LockManager();
+
+ // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
+ this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+ setSlowThresholds(e.getConfiguration());
+ } catch (NoSuchMethodError ex) {
+ disabled = true;
+ super.start(e);
+ LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
+ }
+ }
+
+ /**
+ * Extracts the slow call threshold values from the configuration.
+ */
+ private void setSlowThresholds(Configuration c) {
+ slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
+ INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
+ slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
+ INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
+ slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
+ INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
+ slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
+ INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
+ }
+
+ private String getCallTooSlowMessage(String callName, long duration, long threshold) {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append("(callTooSlow) ").append(callName).append(" duration=").append(duration);
+ sb.append("ms, threshold=").append(threshold).append("ms");
+ return sb.toString();
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment e) throws IOException {
+ if (this.stopped) {
+ return;
+ }
+ if (this.disabled) {
+ return;
+ }
+ this.stopped = true;
+ String msg = "Indexer is being stopped";
+ this.builder.stop(msg);
+ this.preWriter.stop(msg);
+ this.postWriter.stop(msg);
+ }
+
+ /**
+ * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
+ * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
+ * real increment, though, it's really more of a Put. We translate the Increment into a
+ * list of mutations, at most a single Put and Delete that are the changes upon executing
+ * the list of ON DUPLICATE KEY clauses for this row.
+ */
+ @Override
+ public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Increment inc) throws IOException {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ List<Mutation> mutations = this.builder.executeAtomicOp(inc);
+ if (mutations == null) {
+ return null;
+ }
+ // Causes the Increment to be ignored as we're committing the mutations
+ // ourselves below.
+ e.bypass();
+ e.complete();
+ // ON DUPLICATE KEY IGNORE will return empty list if row already exists
+ // as no action is required in that case.
+ if (!mutations.isEmpty()) {
+ Region region = e.getEnvironment().getRegion();
+ // Otherwise, submit the mutations directly here
+ region.batchMutate(mutations.toArray(new Mutation[0]), HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
+ }
+ return Result.EMPTY_RESULT;
+ } catch (Throwable t) {
+ throw ServerUtil.createIOException(
+ "Unable to process ON DUPLICATE IGNORE for " +
+ e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() +
+ "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexPrepareThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold));
+ }
+ metricSource.incrementSlowDuplicateKeyCheckCalls();
+ }
+ metricSource.updateDuplicateKeyCheckTime(duration);
+ }
+ }
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ if (this.disabled) {
+ return;
+ }
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ preBatchMutateWithExceptions(c, miniBatchOp);
+ return;
+ } catch (Throwable t) {
+ rethrowIndexingException(t);
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexPrepareThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
+ }
+ metricSource.incrementNumSlowIndexPrepareCalls();
+ }
+ metricSource.updateIndexPrepareTime(duration);
+ }
+ throw new RuntimeException(
+ "Somehow didn't return an index update but also didn't propagate the failure to the client!");
+ }
+
+ private long getMaxTimestamp(Mutation m) {
+ long maxTs = 0;
+ long ts = 0;
+ Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next();
+ Iterator<Cell> cellIterator = entry.getValue().iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ ts = cell.getTimestamp();
+ if (ts > maxTs) {
+ maxTs = ts;
+ }
+ }
+ }
+ return maxTs;
+ }
+
+ private void ignoreAtomicOperations (MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ if (this.builder.isAtomicOp(m)) {
+ miniBatchOp.setOperationStatus(i, IGNORE);
+ continue;
+ }
+ }
+ }
+
+ private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (this.builder.isEnabled(m)) {
+ context.rowLocks.add(lockManager.lockRow(m.getRow(), rowLockWaitDuration));
+ }
+ }
+ }
+
+ private void populatePendingRows(BatchMutateContext context, long now) {
+ for (RowLock rowLock : context.rowLocks) {
+ ImmutableBytesPtr rowKey = rowLock.getRowKey();
+ PendingRow pendingRow = pendingRows.get(rowKey);
+ if (pendingRow == null) {
+ pendingRows.put(rowKey, new PendingRow(now));
+ } else {
+ // m is a mutation on a row that has already a pending mutation in progress from another batch
+ pendingRow.add(now);
+ context.pendingRows.add(rowKey);
+ }
+ }
+ }
+
+ private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ long now, ReplayWrite replayWrite) throws IOException {
+ Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
+ boolean copyMutations = false;
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (this.builder.isEnabled(m)) {
+ // Track whether or not we need to
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ if (mutationsMap.containsKey(row)) {
+ copyMutations = true;
+ } else {
+ mutationsMap.put(row, null);
+ }
+ }
+ }
+ // early exit if it turns out we don't have any edits
+ if (mutationsMap.isEmpty()) {
+ return null;
+ }
+ // If we're copying the mutations
+ Collection<Mutation> originalMutations;
+ Collection<? extends Mutation> mutations;
+ if (copyMutations) {
+ originalMutations = null;
+ mutations = mutationsMap.values();
+ } else {
+ originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size());
+ mutations = originalMutations;
+ }
+
+ boolean resetTimeStamp = replayWrite == null;
+
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ // skip this mutation if we aren't enabling indexing
+ // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+ // should be indexed, which means we need to expose another method on the builder. Such is the
+ // way optimization go though.
+ if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
+ if (resetTimeStamp) {
+ // Unless we're replaying edits to rebuild the index, we update the time stamp
+ // of the data table to prevent overlapping time stamps (which prevents index
+ // inconsistencies as this case isn't handled correctly currently).
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ CellUtil.setTimestamp(cell, now);
+ }
+ }
+ }
+ // No need to write the table mutations when we're rebuilding
+ // the index as they're already written and just being replayed.
+ if (replayWrite == ReplayWrite.INDEX_ONLY
+ || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY || skipDataTableUpdatesForTesting) {
+ miniBatchOp.setOperationStatus(i, NOWRITE);
+ }
+
+ // Only copy mutations if we found duplicate rows
+ // which only occurs when we're partially rebuilding
+ // the index (since we'll potentially have both a
+ // Put and a Delete mutation for the same row).
+ if (copyMutations) {
+ // Add the mutation to the batch set
+
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ MultiMutation stored = mutationsMap.get(row);
+ // we haven't seen this row before, so add it
+ if (stored == null) {
+ stored = new MultiMutation(row);
+ mutationsMap.put(row, stored);
+ }
+ stored.addAll(m);
+ } else {
+ originalMutations.add(m);
+ }
+ }
+ }
+
+ if (copyMutations || replayWrite != null) {
+ mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+ }
+ return mutations;
+ }
+
+ private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context,
+ Collection<? extends Mutation> mutations) throws Throwable {
+ IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+ if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+ throw new DoNotRetryIOException(
+ "preBatchMutateWithExceptions: indexMetaData is not an instance of PhoenixIndexMetaData " +
+ c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
+
+ List<Pair<Mutation, byte[]>> indexUpdatesForDeletes;
+ // get the current span, or just use a null-span to avoid a bunch of if statements
+ try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+ Span current = scope.getSpan();
+ if (current == null) {
+ current = NullSpan.INSTANCE;
+ }
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+
+ // get the index updates for all elements in this batch
+ Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
+ this.builder.getIndexUpdates(miniBatchOp, mutations);
+
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexPrepareThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
+ }
+ metricSource.incrementNumSlowIndexPrepareCalls();
+ }
+ metricSource.updateIndexPrepareTime(duration);
+ current.addTimelineAnnotation("Built index updates, doing preStep");
+ TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
+ byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+ Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+ List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+ indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
+ context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
+ while(indexUpdatesItr.hasNext()) {
+ Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
+ if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
+ localUpdates.add(next.getFirst().getFirst());
+ indexUpdatesItr.remove();
+ }
+ else {
+ // get index maintainer for this index table
+ IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
+ if (indexMaintainer == null) {
+ throw new DoNotRetryIOException(
+ "preBatchMutateWithExceptions: indexMaintainer is null " +
+ c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ // add the VERIFIED cell, which is the empty cell
+ Mutation m = next.getFirst().getFirst();
+ boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+ long ts = getMaxTimestamp(m);
+ if (rebuild) {
+ if (m instanceof Put) {
+ ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+ }
+ } else {
+ if (m instanceof Put) {
+ ((Put)m).addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
+ // Ignore post index updates (i.e., the third write phase updates) for this row if it is
+ // going through concurrent updates
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
+ if (!context.pendingRows.contains(rowKey)) {
+ Put put = new Put(m.getRow());
+ put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+ context.intermediatePostIndexUpdates.add(new Pair<>(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()), next.getSecond()));
+ }
+ } else {
+ // For a delete mutation, first unverify the existing row in the index table and then delete
+ // the row from the index table after deleting the corresponding row from the data table
+ indexUpdatesItr.remove();
+ Put put = new Put(m.getRow());
+ put.addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
+ indexUpdatesForDeletes.add(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()));
+ // Ignore post index updates (i.e., the third write phase updates) for this row if it is
+ // going through concurrent updates
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
+ if (!context.pendingRows.contains(rowKey)) {
+ context.intermediatePostIndexUpdates.add(next);
+ }
+ }
+ }
+ }
+ }
+ if (!localUpdates.isEmpty()) {
+ miniBatchOp.addOperationsFromCP(0,
+ localUpdates.toArray(new Mutation[localUpdates.size()]));
+ }
+ if (!indexUpdatesForDeletes.isEmpty()) {
+ context.preIndexUpdates = indexUpdatesForDeletes;
+ }
+
+ if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
+ context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
+ }
+ for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
+ context.preIndexUpdates.add(update.getFirst());
+ }
+ }
+ }
+
+ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+ ignoreAtomicOperations(miniBatchOp);
+ BatchMutateContext context = new BatchMutateContext(this.builder.getIndexMetaData(miniBatchOp).getClientVersion());
+ setBatchMutateContext(c, context);
+ Mutation firstMutation = miniBatchOp.getOperation(0);
+ ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+ /*
+ * Exclusively lock all rows so we get a consistent read
+ * while determining the index updates
+ */
+ if (replayWrite == null) {
+ lockRows(miniBatchOp, context);
+ }
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+ // concurrent updates
+ if (replayWrite == null) {
+ populatePendingRows(context, now);
+ }
+ // First group all the updates for a single row into a single update to be processed
+ Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
+ // early exit if it turns out we don't have any edits
+ if (mutations == null) {
+ return;
+ }
+ prepareIndexMutations(c, miniBatchOp, context, mutations);
+ // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
+ // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
+ // can be prepared in less than one millisecond
+ if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
+ Thread.sleep(1);
+ LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ // Release the locks before making RPC calls for index updates
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ // Do the index updates
+ doPre(c, context, miniBatchOp);
+ if (replayWrite == null) {
+ // Acquire the locks again before letting the region proceed with data table updates
+ List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
+ for (RowLock rowLock : context.rowLocks) {
+ rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
+ }
+ context.rowLocks.clear();
+ context.rowLocks = rowLocks;
+ // Check if we need to skip post index update for any of the row
+ Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator();
+ while (iterator.hasNext()) {
+ // Check if this row is going through another mutation which has a newer timestamp. If so,
+ // ignore the pending updates for this row
+ Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+ PendingRow pendingRow = pendingRows.get(rowKey);
+ // Has any concurrent mutation arrived for the same row? if so, skip post index updates
+ // and let read repair resolve conflicts
+ if (pendingRow.getLatestTimestamp() > now) {
+ iterator.remove();
+ }
+ }
+ // We are done with handling concurrent mutations. So we can remove the rows of this batch from
+ // the collection of pending rows
+ removePendingRows(context);
+ }
+ if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) {
+ context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size());
+ }
+ for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) {
+ context.postIndexUpdates.add(update.getFirst());
+ }
+ }
+
+ private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
+ this.batchMutateContext.set(context);
+ }
+
+ private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ return this.batchMutateContext.get();
+ }
+
+ private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ this.batchMutateContext.remove();
+ }
+
+ @Override
+ public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
+ if (this.disabled) {
+ return;
+ }
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ BatchMutateContext context = getBatchMutateContext(c);
+ if (context == null) {
+ return;
+ }
+ try {
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ this.builder.batchCompleted(miniBatchOp);
+
+ if (success) { // The pre-index and data table updates are successful, and now, do post index updates
+ if (!skipPostIndexUpdatesForTesting) {
+ doPost(c, context);
+ }
+ }
+ } finally {
+ removeBatchMutateContext(c);
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexWriteThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold));
+ }
+ metricSource.incrementNumSlowIndexWriteCalls();
+ }
+ metricSource.updateIndexWriteTime(duration);
+ }
+ }
+
+ private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException {
+ try {
+ doIndexWritesWithExceptions(context, true);
+ return;
+ } catch (Throwable e) {
+ rethrowIndexingException(e);
+ }
+ throw new RuntimeException(
+ "Somehow didn't complete the index update, but didn't return succesfully either!");
+ }
+
+ private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post)
+ throws IOException {
+ Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
+ //short circuit, if we don't need to do any work
+
+ if (context == null || indexUpdates.isEmpty()) {
+ return;
+ }
+
+ // get the current span, or just use a null-span to avoid a bunch of if statements
+ try (TraceScope scope = Trace.startSpan("Completing " + (post ? "post" : "pre") + " index writes")) {
+ Span current = scope.getSpan();
+ if (current == null) {
+ current = NullSpan.INSTANCE;
+ }
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ current.addTimelineAnnotation("Actually doing " + (post ? "post" : "pre") + " index update for first time");
+ if (post) {
+ postWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
+ } else {
+ preWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
+ }
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowIndexWriteThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold));
+ }
+ metricSource.incrementNumSlowIndexWriteCalls();
+ }
+ metricSource.updateIndexWriteTime(duration);
+ }
+ }
+
+ private void removePendingRows(BatchMutateContext context) {
+ for (RowLock rowLock : context.rowLocks) {
+ ImmutableBytesPtr rowKey = rowLock.getRowKey();
+ PendingRow pendingRow = pendingRows.get(rowKey);
+ if (pendingRow != null) {
+ pendingRow.remove();
+ if (pendingRow.getCount() == 0) {
+ pendingRows.remove(rowKey);
+ }
+ }
+ }
+ }
+
+ private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ try {
+ doIndexWritesWithExceptions(context, false);
+ return;
+ } catch (Throwable e) {
+ removePendingRows(context);
+ rethrowIndexingException(e);
+ }
+ throw new RuntimeException(
+ "Somehow didn't complete the index update, but didn't return succesfully either!");
+ }
+
+ @Override
+ public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) {
+ Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
+
+ if (this.disabled) {
+ return;
+ }
+
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ //if we have no pending edits to complete, then we are done
+ if (updates == null || updates.size() == 0) {
+ return;
+ }
+
+ LOG.info("Found some outstanding index updates that didn't succeed during"
+ + " WAL replay - attempting to replay now.");
+
+ // do the usual preWriter stuff
+ try {
+ preWriter.writeAndHandleFailure(updates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
+ } catch (IOException e) {
+ LOG.error("During WAL replay of outstanding index updates, "
+ + "Exception is thrown instead of killing server during index writing", e);
+ }
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ if (duration >= slowPostOpenThreshold) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold));
+ }
+ metricSource.incrementNumSlowPostOpenCalls();
+ }
+ metricSource.updatePostOpenTime(duration);
+ }
+ }
+
+ /**
+ * Exposed for testing!
+ * @return the currently instantiated index builder
+ */
+ public IndexBuilder getBuilderForTesting() {
+ return this.builder.getBuilderForTesting();
+ }
+
+ /**
+ * Enable indexing on the given table
+ * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled
+ * @param builder class to use when building the index for this table
+ * @param properties map of custom configuration options to make available to your
+ * {@link IndexBuilder} on the server-side
+ * @param priority TODO
+ * @throws IOException the Indexer coprocessor cannot be added
+ */
+ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder,
+ Map<String, String> properties, int priority) throws IOException {
+ if (properties == null) {
+ properties = new HashMap<String, String>();
+ }
+ properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
+ desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
+ }
+}
+
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
index 02e4c3c..4cc7b23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
@@ -51,16 +51,13 @@ public class LockManager {
/**
* Lock the row or throw otherwise
- * @param row the row key
+ * @param rowKey the row key
* @return RowLock used to eventually release the lock
* @throws TimeoutIOException if the lock could not be acquired within the
* allowed rowLockWaitDuration and InterruptedException if interrupted while
* waiting to acquire lock.
*/
- public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
- // create an object to use a a key in the row lock map
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
-
+ public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws IOException {
RowLockContext rowLockContext = null;
RowLockImpl result = null;
TraceScope traceScope = null;
@@ -116,6 +113,12 @@ public class LockManager {
}
}
+ public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
+ // create an object to use a a key in the row lock map
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
+ return lockRow(rowKey, waitDuration);
+ }
+
/**
* Unlock the row. We need this stateless way of unlocking because
* we have no means of passing the RowLock instances between
@@ -226,6 +229,11 @@ public class LockManager {
}
@Override
+ public ImmutableBytesPtr getRowKey() {
+ return context.rowKey;
+ }
+
+ @Override
public String toString() {
return "RowLockImpl{" +
"context=" + context +
@@ -247,6 +255,8 @@ public class LockManager {
* thread
*/
void release();
+
+ ImmutableBytesPtr getRowKey();
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index f13e97a..59b7619 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -87,13 +87,13 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
* @throws IOException
*/
@Override
- public boolean isEnabled(Mutation m) throws IOException {
+ public boolean isEnabled(Mutation m) {
// ask the codec to see if we should even attempt indexing
return this.codec.isEnabled(m);
}
@Override
- public boolean isAtomicOp(Mutation m) throws IOException {
+ public boolean isAtomicOp(Mutation m) {
return false;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
index 7489a8c..89c751e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -28,10 +28,9 @@ public abstract class BaseIndexCodec implements IndexCodec {
* <p>
* By default, the codec is always enabled. Subclasses should override this method if they want do
* decide to index on a per-mutation basis.
- * @throws IOException
*/
@Override
- public boolean isEnabled(Mutation m) throws IOException {
+ public boolean isEnabled(Mutation m) {
return true;
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 07a05bc..6130093 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -79,7 +79,7 @@ public class IndexBuildManager implements Stoppable {
return this.delegate.getIndexMetaData(miniBatchOp);
}
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
+ public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
// notify the delegate that we have started processing a batch
@@ -87,7 +87,7 @@ public class IndexBuildManager implements Stoppable {
this.delegate.batchStarted(miniBatchOp, indexMetaData);
// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
- ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+ ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size());
for (Mutation m : mutations) {
Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
@@ -96,6 +96,30 @@ public class IndexBuildManager implements Stoppable {
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
}
}
+ for (Pair<Mutation, byte[]> update : updates) {
+ results.add(new Pair<>(update, m.getRow()));
+ }
+ }
+ return results;
+ }
+
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ Collection<? extends Mutation> mutations) throws Throwable {
+ // notify the delegate that we have started processing a batch
+ final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
+ this.delegate.batchStarted(miniBatchOp, indexMetaData);
+
+ // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
+ ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+ for (Mutation m : mutations) {
+ Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+ if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
+ for (Pair<Mutation, byte[]> update : updates) {
+ update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+ }
+ }
results.addAll(updates);
}
return results;
@@ -116,11 +140,11 @@ public class IndexBuildManager implements Stoppable {
delegate.batchStarted(miniBatchOp, indexMetaData);
}
- public boolean isEnabled(Mutation m) throws IOException {
+ public boolean isEnabled(Mutation m) {
return delegate.isEnabled(m);
}
- public boolean isAtomicOp(Mutation m) throws IOException {
+ public boolean isAtomicOp(Mutation m) {
return delegate.isAtomicOp(m);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index a00294c..d3b9d6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -132,7 +132,7 @@ public interface IndexBuilder extends Stoppable {
* basis, as each codec is instantiated per-region.
* @throws IOException
*/
- public boolean isEnabled(Mutation m) throws IOException;
+ public boolean isEnabled(Mutation m);
/**
* True if mutation has an ON DUPLICATE KEY clause
@@ -140,7 +140,7 @@ public interface IndexBuilder extends Stoppable {
* @return true if mutation has ON DUPLICATE KEY expression and false otherwise.
* @throws IOException
*/
- public boolean isAtomicOp(Mutation m) throws IOException;
+ public boolean isAtomicOp(Mutation m);
/**
* Calculate the mutations based on the ON DUPLICATE KEY clause
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index 7dde941..9028ec7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -78,7 +78,7 @@ public interface IndexCodec {
* codec is instantiated per-region.
* @throws IOException
*/
- public boolean isEnabled(Mutation m) throws IOException;
+ public boolean isEnabled(Mutation m);
public void initialize(Configuration conf, byte[] startKey, byte[] endKey, byte[] tableName);
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
similarity index 73%
copy from phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
copy to phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
index 290e1be..9e94e87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
@@ -1,11 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.phoenix.hbase.index.write;
@@ -14,20 +22,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
import org.apache.phoenix.hbase.index.parallel.Task;
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
@@ -42,45 +47,42 @@ import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.Multimap;
/**
- * Write index updates to the index tables in parallel. We attempt to early exit from the writes if any of the index
- * updates fails. Completion is determined by the following criteria: *
- * <ol>
- * <li>All index writes have returned, OR</li>
- * <li>Any single index write has failed</li>
- * </ol>
- * We attempt to quickly determine if any write has failed and not write to the remaining indexes to ensure a timely
- * recovery of the failed index writes.
+ * Abstract class to Write index updates to the index tables in parallel.
*/
-public class ParallelWriterIndexCommitter implements IndexCommitter {
+public abstract class AbstractParallelWriterIndexCommitter implements IndexCommitter {
public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
- private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
+ private static final Log LOG = LogFactory.getLog(AbstractParallelWriterIndexCommitter.class);
- private HTableFactory retryingFactory;
- private HTableFactory noRetriesfactory;
- private Stoppable stopped;
- private QuickFailingTaskRunner pool;
- private KeyValueBuilder kvBuilder;
- private RegionCoprocessorEnvironment env;
+ protected HTableFactory retryingFactory;
+ protected HTableFactory noRetriesFactory;
+ protected Stoppable stopped;
+ protected QuickFailingTaskRunner pool;
+ protected KeyValueBuilder kvBuilder;
+ protected RegionCoprocessorEnvironment env;
+ protected TaskBatch<Void> tasks;
+ protected boolean disableIndexOnFailure = false;
- public ParallelWriterIndexCommitter() {}
+
+ public AbstractParallelWriterIndexCommitter() {}
// For testing
- public ParallelWriterIndexCommitter(String hbaseVersion) {
+ public AbstractParallelWriterIndexCommitter(String hbaseVersion) {
kvBuilder = KeyValueBuilder.get(hbaseVersion);
}
@Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) {
this.env = env;
+ this.disableIndexOnFailure = disableIndexOnFailure;
Configuration conf = env.getConfiguration();
setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
ThreadPoolManager.getExecutor(
new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
- INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), parent, env);
this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
}
@@ -89,11 +91,12 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
* <p>
* Exposed for TESTING
*/
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
+ public void setup(HTableFactory factory, ExecutorService pool,Stoppable stop, RegionCoprocessorEnvironment env) {
this.retryingFactory = factory;
- this.noRetriesfactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
+ this.noRetriesFactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
this.pool = new QuickFailingTaskRunner(pool);
this.stopped = stop;
+ this.env = env;
}
@Override
@@ -109,7 +112,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
*/
Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
- TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+ tasks = new TaskBatch<Void>(entries.size());
for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
// get the mutations for each table. We leak the implementation here a little bit to save
// doing a complete copy over of all the index update for each table.
@@ -118,7 +121,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
if (env != null
&& !allowLocalUpdates
&& tableReference.getTableName().equals(
- env.getRegion().getTableDesc().getNameAsString())) {
+ env.getRegion().getTableDesc().getTableName().getNameAsString())) {
continue;
}
/*
@@ -140,6 +143,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
@SuppressWarnings("deprecation")
@Override
public Void call() throws Exception {
+ Table table = null;
// this may have been queued, so another task infront of us may have failed, so we should
// early exit, if that's the case
throwFailureIfDone();
@@ -147,29 +151,34 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
if (LOG.isTraceEnabled()) {
LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
}
- HTableInterface table = null;
try {
if (allowLocalUpdates
&& env != null
&& tableReference.getTableName().equals(
- env.getRegion().getTableDesc().getNameAsString())) {
+ env.getRegion().getTableDesc().getTableName().getNameAsString())) {
try {
throwFailureIfDone();
IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
return null;
- } catch (IOException ignord) {
+ } catch (IOException ignored) {
// when it's failed we fall back to the standard & slow way
if (LOG.isDebugEnabled()) {
LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
- + ignord);
+ + ignored);
}
}
}
- // if the client can retry index writes, then we don't need to retry here
- HTableFactory factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesfactory;
+ // if the client can retry index writes, then we don't need to retry here
+ HTableFactory factory;
+ if (disableIndexOnFailure) {
+ factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesFactory;
+ }
+ else {
+ factory = retryingFactory;
+ }
table = factory.getTable(tableReference.get());
throwFailureIfDone();
- table.batch(mutations);
+ table.batch(mutations, null);
} catch (SingleIndexWriteFailureException e) {
throw e;
} catch (IOException e) {
@@ -194,20 +203,9 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
}
});
}
-
- // actually submit the tasks to the pool and wait for them to finish/fail
- try {
- pool.submitUninterruptible(tasks);
- } catch (EarlyExitFailure e) {
- propagateFailure(e);
- } catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
- propagateFailure(e.getCause());
- }
-
}
- private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+ protected void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
try {
throw throwable;
} catch (SingleIndexWriteFailureException e1) {
@@ -232,11 +230,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
this.pool.stop(why);
this.retryingFactory.shutdown();
- this.noRetriesfactory.shutdown();
+ this.noRetriesFactory.shutdown();
}
@Override
public boolean isStopped() {
return this.stopped.isStopped();
}
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
index e9dc202..1eb9ff1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -30,7 +30,7 @@ import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
*/
public interface IndexCommitter extends Stoppable {
- void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
+ void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure);
public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates, int clientVersion)
throws IndexWriteException;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index c28288c..6c5c57c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -39,8 +39,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
/**
- * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
- * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
+ * Do the actual work of writing to the index tables.
* <p>
* We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
* threads, so it will not block the region from shutting down.
@@ -59,12 +58,15 @@ public class IndexWriter implements Stoppable {
* instantiated
*/
public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
- this(getCommitter(env), getFailurePolicy(env), env, name);
+ this(getCommitter(env), getFailurePolicy(env), env, name, true);
}
- public IndexWriter(IndexFailurePolicy failurePolicy, RegionCoprocessorEnvironment env, String name) throws IOException {
- this(getCommitter(env), failurePolicy, env, name);
- }
+ public IndexWriter(RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) throws IOException {
+ this(getCommitter(env), getFailurePolicy(env), env, name, disableIndexOnFailure);
+ }
+ public IndexWriter(RegionCoprocessorEnvironment env, IndexCommitter indexCommitter, String name, boolean disableIndexOnFailure) throws IOException {
+ this(indexCommitter, getFailurePolicy(env), env, name, disableIndexOnFailure);
+ }
public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
return getCommitter(env,TrackingParallelWriterIndexCommitter.class);
@@ -99,6 +101,12 @@ public class IndexWriter implements Stoppable {
}
}
+ public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
+ RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) {
+ this(committer, policy);
+ this.writer.setup(this, env, name, disableIndexOnFailure);
+ this.failurePolicy.setup(this, env);
+ }
/**
* Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
* to be fully setup before calling.
@@ -109,7 +117,7 @@ public class IndexWriter implements Stoppable {
public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
RegionCoprocessorEnvironment env, String name) {
this(committer, policy);
- this.writer.setup(this, env, name);
+ this.writer.setup(this, env, name, true);
this.failurePolicy.setup(this, env);
}
@@ -131,27 +139,64 @@ public class IndexWriter implements Stoppable {
* write the index updates. When we return depends on the specified {@link IndexCommitter}.
* <p>
* If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
- * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
- * which ensures that the server crashes when an index write fails, ensuring that we get WAL
- * replay of the index edits.
+ * then decides how to handle the failure.
* @param indexUpdates Updates to write
* @param clientVersion version of the client
* @throws IOException
*/
- public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
- boolean allowLocalUpdates, int clientVersion) throws IOException {
+ public void writeAndHandleFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
+ boolean allowLocalUpdates, int clientVersion) throws IOException {
// convert the strings to htableinterfaces to which we can talk and group by TABLE
Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
- writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates, clientVersion);
+ writeAndHandleFailure(toWrite, allowLocalUpdates, clientVersion);
+ }
+
+ /**
+ * see {@link #writeAndHandleFailure(Collection)}.
+ * @param toWrite
+ * @throws IOException
+ */
+ public void writeAndHandleFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
+ boolean allowLocalUpdates, int clientVersion) throws IOException {
+ try {
+ write(toWrite, allowLocalUpdates, clientVersion);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Done writing all index updates!\n\t" + toWrite);
+ }
+ } catch (Exception e) {
+ this.failurePolicy.handleFailure(toWrite, e);
+ }
+ }
+
+ /**
+ * Write the mutations to their respective table.
+ * <p>
+ * This method is blocking and could potentially cause the writer to block for a long time as we
+ * write the index updates. When we return depends on the specified {@link IndexCommitter}.
+ * <p>
+ * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
+ * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
+ * which ensures that the server crashes when an index write fails, ensuring that we get WAL
+ * replay of the index edits.
+ * @param indexUpdates Updates to write
+ * @param clientVersion version of the client
+ * @throws IOException
+ */
+ public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
+ boolean allowLocalUpdates, int clientVersion) throws IOException {
+ // convert the strings to htableinterfaces to which we can talk and group by TABLE
+ Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
+ writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates, clientVersion);
+ writeAndHandleFailure(toWrite, allowLocalUpdates, clientVersion);
}
/**
* see {@link #writeAndKillYourselfOnFailure(Collection)}.
* @param toWrite
- * @throws IOException
+ * @throws IOException
*/
- public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
- boolean allowLocalUpdates, int clientVersion) throws IOException {
+ public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
+ boolean allowLocalUpdates, int clientVersion) throws IOException {
try {
write(toWrite, allowLocalUpdates, clientVersion);
if (LOG.isTraceEnabled()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
similarity index 58%
copy from phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
copy to phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
index 7489a8c..f070278 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
@@ -15,23 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.hbase.index.builder;
+package org.apache.phoenix.hbase.index.write;
-import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but does not block
+ *
+ *
+ */
+public class LazyParallelWriterIndexCommitter extends AbstractParallelWriterIndexCommitter {
+
+ // for testing
+ public LazyParallelWriterIndexCommitter(String hbaseVersion) {
+ super(hbaseVersion);
+ }
-public abstract class BaseIndexCodec implements IndexCodec {
- /**
- * {@inheritDoc}
- * <p>
- * By default, the codec is always enabled. Subclasses should override this method if they want do
- * decide to index on a per-mutation basis.
- * @throws IOException
- */
- @Override
- public boolean isEnabled(Mutation m) throws IOException {
- return true;
+ public LazyParallelWriterIndexCommitter() {
+ super();
}
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 290e1be..24569a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -9,35 +9,14 @@
*/
package org.apache.phoenix.hbase.index.write;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
-import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
-import org.apache.phoenix.hbase.index.parallel.Task;
-import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
-import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
-import org.apache.phoenix.util.IndexUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Multimap;
@@ -51,192 +30,29 @@ import com.google.common.collect.Multimap;
* We attempt to quickly determine if any write has failed and not write to the remaining indexes to ensure a timely
* recovery of the failed index writes.
*/
-public class ParallelWriterIndexCommitter implements IndexCommitter {
-
- public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
- private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
- public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
- private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
-
- private HTableFactory retryingFactory;
- private HTableFactory noRetriesfactory;
- private Stoppable stopped;
- private QuickFailingTaskRunner pool;
- private KeyValueBuilder kvBuilder;
- private RegionCoprocessorEnvironment env;
+public class ParallelWriterIndexCommitter extends AbstractParallelWriterIndexCommitter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParallelWriterIndexCommitter.class);
public ParallelWriterIndexCommitter() {}
// For testing
public ParallelWriterIndexCommitter(String hbaseVersion) {
- kvBuilder = KeyValueBuilder.get(hbaseVersion);
- }
-
- @Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
- this.env = env;
- Configuration conf = env.getConfiguration();
- setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
- ThreadPoolManager.getExecutor(
- new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
- INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
- this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
- }
-
- /**
- * Setup <tt>this</tt>.
- * <p>
- * Exposed for TESTING
- */
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
- this.retryingFactory = factory;
- this.noRetriesfactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
- this.pool = new QuickFailingTaskRunner(pool);
- this.stopped = stop;
+ super(hbaseVersion);
}
@Override
public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates, final int clientVersion) throws SingleIndexWriteFailureException {
- /*
- * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
- * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
- * tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get
- * successfully updated, or (2) any one of the index table writes fail; in either case, we should return as
- * quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the
- * index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task
- * to fail immediately as we know that write is a waste and will need to be replayed anyways.
- */
-
- Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
- TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
- for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
- // get the mutations for each table. We leak the implementation here a little bit to save
- // doing a complete copy over of all the index update for each table.
- final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
- final HTableInterfaceReference tableReference = entry.getKey();
- if (env != null
- && !allowLocalUpdates
- && tableReference.getTableName().equals(
- env.getRegion().getTableDesc().getNameAsString())) {
- continue;
- }
- /*
- * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
- * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
- * The former will only work if we are not in the midst of writing the current batch to the table, though we
- * do check these status variables before starting and before writing the batch. The latter usage,
- * interrupting the thread, will work in the previous situations as was at some points while writing the
- * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
- * elaborate when is supports an interrupt).
- */
- tasks.add(new Task<Void>() {
-
- /**
- * Do the actual write to the primary table.
- *
- * @return
- */
- @SuppressWarnings("deprecation")
- @Override
- public Void call() throws Exception {
- // this may have been queued, so another task infront of us may have failed, so we should
- // early exit, if that's the case
- throwFailureIfDone();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
- }
- HTableInterface table = null;
- try {
- if (allowLocalUpdates
- && env != null
- && tableReference.getTableName().equals(
- env.getRegion().getTableDesc().getNameAsString())) {
- try {
- throwFailureIfDone();
- IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
- return null;
- } catch (IOException ignord) {
- // when it's failed we fall back to the standard & slow way
- if (LOG.isDebugEnabled()) {
- LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
- + ignord);
- }
- }
- }
- // if the client can retry index writes, then we don't need to retry here
- HTableFactory factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesfactory;
- table = factory.getTable(tableReference.get());
- throwFailureIfDone();
- table.batch(mutations);
- } catch (SingleIndexWriteFailureException e) {
- throw e;
- } catch (IOException e) {
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
- } catch (InterruptedException e) {
- // reset the interrupt status on the thread
- Thread.currentThread().interrupt();
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
- }
- finally{
- if (table != null) {
- table.close();
- }
- }
- return null;
- }
-
- private void throwFailureIfDone() throws SingleIndexWriteFailureException {
- if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
- "Pool closed, not attempting to write to the index!", null); }
-
- }
- });
- }
+ super.write(toWrite, allowLocalUpdates, clientVersion);
// actually submit the tasks to the pool and wait for them to finish/fail
try {
pool.submitUninterruptible(tasks);
} catch (EarlyExitFailure e) {
propagateFailure(e);
} catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
+ LOGGER.error("Found a failed index update!");
propagateFailure(e.getCause());
}
}
-
- private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
- try {
- throw throwable;
- } catch (SingleIndexWriteFailureException e1) {
- throw e1;
- } catch (Throwable e1) {
- throw new SingleIndexWriteFailureException("Got an abort notification while writing to the index!", e1);
- }
-
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed by the external
- * {@link Stoppable}. This call does not delegate the stop down to the {@link Stoppable} passed in the constructor.
- *
- * @param why
- * the reason for stopping
- */
- @Override
- public void stop(String why) {
- LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
- this.pool.stop(why);
- this.retryingFactory.shutdown();
- this.noRetriesfactory.shutdown();
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped.isStopped();
- }
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index 4fa2d0b..fe13025 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -81,6 +81,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
private Stoppable stopped;
private RegionCoprocessorEnvironment env;
private KeyValueBuilder kvBuilder;
+ protected boolean disableIndexOnFailure = false;
// for testing
public TrackingParallelWriterIndexCommitter(String hbaseVersion) {
@@ -91,8 +92,9 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
}
@Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) {
this.env = env;
+ this.disableIndexOnFailure = disableIndexOnFailure;
Configuration conf = env.getConfiguration();
setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
ThreadPoolManager.getExecutor(
@@ -179,7 +181,13 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
}
// if the client can retry index writes, then we don't need to retry here
- HTableFactory factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesFactory;
+ HTableFactory factory;
+ if (disableIndexOnFailure) {
+ factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesFactory;
+ }
+ else {
+ factory = retryingFactory;
+ }
table = factory.getTable(tableReference.get());
throwFailureIfDone();
table.batch(mutations);
@@ -233,7 +241,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
if (failures.size() > 0) {
// make the list unmodifiable to avoid any more synchronization concerns
throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures),
- PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
+ disableIndexOnFailure && PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
}
return;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
new file mode 100644
index 0000000..5be7ae8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CHECK_VERIFY_COLUMN;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
+import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+/**
+ *
+ * Coprocessor that verifies the scanned rows of a non-transactional global index.
+ *
+ */
+public class GlobalIndexChecker extends BaseRegionObserver {
+ private static final Log LOG = LogFactory.getLog(GlobalIndexChecker.class);
+ /**
+ * Class that verifies a given row of a non-transactional global index.
+ * An instance of this class is created for each scanner on an index
+ * and used to verify individual rows and rebuild them if they are not valid
+ */
+ private static class GlobalIndexScanner implements RegionScanner {
+ RegionScanner scanner;
+ private long ageThreshold;
+ private int repairCount;
+ private Scan scan;
+ private Scan indexScan;
+ private Scan buildIndexScan = null;
+ private Table dataHTable = null;
+ private byte[] emptyCF;
+ private byte[] emptyCQ;
+ private IndexMaintainer indexMaintainer = null;
+ private byte[][] viewConstants = null;
+ private RegionCoprocessorEnvironment env;
+ private Region region;
+ private long minTimestamp;
+ private long maxTimestamp;
+
+ public GlobalIndexScanner(RegionCoprocessorEnvironment env, Scan scan, RegionScanner scanner) throws IOException {
+ this.env = env;
+ this.scan = scan;
+ region = env.getRegion();
+ this.scanner = scanner;
+ emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+ emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+ ageThreshold = env.getConfiguration().getLong(
+ QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+ repairCount = env.getConfiguration().getInt(
+ QueryServices.GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB,
+ QueryServicesOptions.DEFAULT_GLOBAL_INDEX_REPAIR_COUNT);
+ minTimestamp = scan.getTimeRange().getMin();
+ maxTimestamp = scan.getTimeRange().getMax();
+ }
+
+ @Override
+ public int getBatch() {
+ return scanner.getBatch();
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return scanner.getMaxResultSize();
+ }
+
+ @Override
+ public boolean next(List<Cell> result) throws IOException {
+ try {
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(result);
+ if (result.isEmpty()) {
+ break;
+ }
+ if (verifyRowAndRepairIfNecessary(result)) {
+ break;
+ }
+ // skip this row as it is invalid
+ // if there is no more row, then result will be an empty list
+ } while (hasMore);
+ return hasMore;
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ throw new IOException("next with scannerContext should not be called in Phoenix environment");
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ if (dataHTable != null) {
+ dataHTable.close();
+ }
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return scanner.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() throws IOException {
+ return scanner.isFilterDone();
+ }
+
+ @Override
+ public boolean reseek(byte[] row) throws IOException {
+ return scanner.reseek(row);
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ return scanner.getMvccReadPoint();
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result) throws IOException {
+ try {
+ boolean hasMore;
+ do {
+ hasMore = scanner.nextRaw(result);
+ if (result.isEmpty()) {
+ break;
+ }
+ if (verifyRowAndRepairIfNecessary(result)) {
+ break;
+ }
+ // skip this row as it is invalid
+ // if there is no more row, then result will be an empty list
+ } while (hasMore);
+ return hasMore;
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ private void deleteRowIfAgedEnough(byte[] indexRowKey, long ts) throws IOException {
+ if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
+ Delete del = new Delete(indexRowKey, ts);
+ Mutation[] mutations = new Mutation[]{del};
+ region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+ }
+
+ private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
+ // Build the data table row key from the index table row key
+ if (buildIndexScan == null) {
+ buildIndexScan = new Scan();
+ indexScan = new Scan(scan);
+ byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+ byte[] indexTableName = region.getRegionInfo().getTable().getName();
+ dataHTable = ServerUtil.getHTableForCoprocessorScan(env, dataTableName);
+ if (indexMaintainer == null) {
+ byte[] md = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(md, true);
+ indexMaintainer = getIndexMaintainer(maintainers, indexTableName);
+ }
+ if (indexMaintainer == null) {
+ throw new DoNotRetryIOException(
+ "repairIndexRows: IndexMaintainer is not included in scan attributes for " +
+ region.getRegionInfo().getTable().getNameAsString());
+ }
+ if (viewConstants == null) {
+ viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+ }
+ // The following attributes are set to instruct UngroupedAggregateRegionObserver to do partial index rebuild
+ // i.e., rebuild a subset of index rows.
+ buildIndexScan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, TRUE_BYTES);
+ buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
+ buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+ buildIndexScan.setAttribute(BaseScannerRegionObserver.SCAN_LIMIT, Bytes.toBytes(repairCount));
+ buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+ }
+ // Rebuild the index rows from the corresponding the rows in the the data table
+ byte[] dataRowKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants);
+ buildIndexScan.setStartRow(dataRowKey);
+ buildIndexScan.setTimeRange(ts, maxTimestamp);
+ buildIndexScan.setRaw(true);
+ try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
+ resultScanner.next();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+ }
+ // Close the current scanner as the newly build row will not be visible to it
+ scanner.close();
+ // Open a new scanner starting from the current row
+ indexScan.setStartRow(indexRowKey);
+ scanner = region.getScanner(indexScan);
+ // Scan the newly build index rows
+ scanner.next(row);
+ if (row.isEmpty()) {
+ return;
+ }
+ // Check if the corresponding data table row exist
+ if (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
+ indexRowKey, 0, indexRowKey.length) == 0) {
+ if (!verifyRowAndRemoveEmptyColumn(row)) {
+ // The corresponding row does not exist in the data table.
+ // Need to delete the row from index if it is old enough
+ deleteRowIfAgedEnough(indexRowKey, ts);
+ row.clear();
+ }
+ return;
+ }
+ // This means the current index row is deleted by the rebuild process and we got the next row.
+ // If it is verified then we are good to go. If not, then we need to repair the new row
+ if (!verifyRowAndRemoveEmptyColumn(row)) {
+ // Rewind the scanner and let the row be scanned again so that it can be repaired
+ scanner.close();
+ scanner = region.getScanner(indexScan);
+ row.clear();
+ }
+ }
+
+ private boolean isEmptyColumn(Cell cell) {
+ return Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ emptyCF, 0, emptyCF.length) == 0 &&
+ Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ emptyCQ, 0, emptyCQ.length) == 0;
+ }
+
+ private boolean verifyRow(byte[] rowKey) throws IOException {
+ LOG.warn("Scan " + scan + " did not return the empty column for " + region.getRegionInfo().getTable().getNameAsString());
+ Get get = new Get(rowKey);
+ get.setTimeRange(minTimestamp, maxTimestamp);
+ get.addColumn(emptyCF, emptyCQ);
+ Result result = region.get(get);
+ if (result.isEmpty()) {
+ LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
+ return false;
+ }
+ if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, UNVERIFIED_BYTES.length,
+ UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean verifyRowAndRemoveEmptyColumn(List<Cell> cellList) throws IOException {
+ long cellListSize = cellList.size();
+ Cell cell = null;
+ if (cellListSize == 0) {
+ return true;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ cell = cellIterator.next();
+ if (isEmptyColumn(cell)) {
+ // Before PHOENIX-5156, the empty column value was set to 'x'. With PHOENIX-5156, it is now
+ // set to VERIFIED (1) and UNVERIFIED (2). In order to skip the index rows that are inserted before PHOENIX-5156
+ // we consider anything that is not UNVERIFIED means VERIFIED. IndexTool should be used to
+ // rebuild old rows to ensure their correctness after the PHOENIX-5156 upgrade
+ if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+ return false;
+ }
+ // Empty column is not supposed to be returned to the client except it is the only column included
+ // in the scan
+ if (cellListSize > 1) {
+ cellIterator.remove();
+ }
+ return true;
+ }
+ }
+ byte[] rowKey = new byte[cell.getRowLength()];
+ System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+ return verifyRow(rowKey);
+ }
+
+ private long getMaxTimestamp(List<Cell> cellList) {
+ long maxTs = 0;
+ long ts = 0;
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ ts = cell.getTimestamp();
+ if (ts > maxTs) {
+ maxTs = ts;
+ }
+ }
+ return maxTs;
+ }
+
+ /**
+ * @param cellList is an input and output parameter and will either include a valid row or be an empty list
+ * @return true if there exists more rows, otherwise false
+ * @throws IOException
+ */
+ private boolean verifyRowAndRepairIfNecessary(List<Cell> cellList) throws IOException {
+ Cell cell = cellList.get(0);
+ if (verifyRowAndRemoveEmptyColumn(cellList)) {
+ return true;
+ } else {
+ byte[] rowKey = new byte[cell.getRowLength()];
+ System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+ long ts = getMaxTimestamp(cellList);
+ cellList.clear();
+ repairIndexRows(rowKey, ts, cellList);
+ if (cellList.isEmpty()) {
+ // This means that the index row is invalid. Return false to tell the caller that this row should be skipped
+ return false;
+ }
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Scan scan, RegionScanner s) throws IOException {
+ if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
+ return s;
+ }
+ return new GlobalIndexScanner(c.getEnvironment(), scan, s);
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index d94d187..cd7b97e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -313,6 +313,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
return maintainers;
}
+ public static IndexMaintainer getIndexMaintainer(List<IndexMaintainer> maintainers, byte[] indexTableName) {
+ Iterator<IndexMaintainer> maintainerIterator = maintainers.iterator();
+ while (maintainerIterator.hasNext()) {
+ IndexMaintainer maintainer = maintainerIterator.next();
+ if (Bytes.compareTo(indexTableName, maintainer.getIndexTableName()) == 0) {
+ return maintainer;
+ }
+ }
+ return null;
+ }
+
private byte[] viewIndexId;
private boolean isMultiTenant;
// indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 585631d..7a706bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -109,7 +109,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
}
@Override
- public boolean isAtomicOp(Mutation m) throws IOException {
+ public boolean isAtomicOp(Mutation m) {
return m.getAttribute(ATOMIC_OP_ATTRIB) != null;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index d33e3fe..8171b0a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -125,7 +125,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
}
@Override
- public boolean isEnabled(Mutation m) throws IOException {
+ public boolean isEnabled(Mutation m) {
return hasIndexMaintainers(m.getAttributesMap());
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index e6b94fb..8a15074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
@@ -133,6 +134,7 @@ public class TableResultIterator implements ResultIterator {
this.caches = caches;
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
.getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
+ IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dff8894..72aaa41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -173,11 +173,13 @@ import org.apache.phoenix.exception.UpgradeInProgressException;
import org.apache.phoenix.exception.UpgradeNotRequiredException;
import org.apache.phoenix.exception.UpgradeRequiredException;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixTransactionalIndexer;
@@ -844,6 +846,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// The phoenix jar must be available on HBase classpath
int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
try {
+ boolean nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
+ boolean isTransactional =
+ Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || nonTxToTx;
+
+ boolean globalIndexerEnabled = config.getBoolean(
+ QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED);
+
+ if (tableType == PTableType.INDEX && !isTransactional) {
+ if (globalIndexerEnabled && !descriptor.hasCoprocessor(GlobalIndexChecker.class.getName())) {
+ descriptor.addCoprocessor(GlobalIndexChecker.class.getName(), null, priority - 1, null);
+ }
+ }
+
if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) {
descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority, null);
}
@@ -857,9 +873,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
}
// For ALTER TABLE
- boolean nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
- boolean isTransactional =
- Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || nonTxToTx;
+
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
// Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
@@ -875,15 +889,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (descriptor.hasCoprocessor(Indexer.class.getName())) {
descriptor.removeCoprocessor(Indexer.class.getName());
}
+ if (descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ descriptor.removeCoprocessor(IndexRegionObserver.class.getName());
+ }
} else {
// If exception on alter table to transition back to non transactional
if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
}
- if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ if (globalIndexerEnabled) {
+ if (!descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ IndexRegionObserver.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
+
+ } else {
+ if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index b31175a..3862c94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -151,6 +151,9 @@ public interface QueryConstants {
public static final ImmutableBytesPtr DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
+ public static final String GLOBAL_INDEX_VERIFIED_COLUMN_QUALIFIER = EMPTY_COLUMN_NAME;
+ public static final byte[] GLOBAL_INDEX_VERIFIED_COLUMN_NAME_BYTES = Bytes.toBytes(GLOBAL_INDEX_VERIFIED_COLUMN_QUALIFIER);
+ ;
public static final String ALL_FAMILY_PROPERTIES_KEY = "";
public static final String SYSTEM_TABLE_PK_NAME = "pk";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index da81575..29b0a36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -318,6 +318,13 @@ public interface QueryServices extends SQLCloseable {
public static final String LOG_BUFFER_WAIT_STRATEGY = "phoenix.log.wait.strategy";
public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate";
+ // The minimum age of an unverified global index row to be eligible for deletion
+ public static final String GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = "phoenix.global.index.row.age.threshold.to.delete.ms";
+ // The maximum number of global index rows to be rebuild at a time
+ public static final String GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB = "phoenix.global.index.row.repair.count.ms";
+ // Enable the IndexRegionObserver Coprocessor
+ public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled";
+
/**
* Get executor service used for parallel scans
*/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 551e0ff..147084b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -339,6 +339,10 @@ public class QueryServicesOptions {
public static final int DEFAULT_UPDATE_CACHE_FREQUENCY = 0;
public static final int DEFAULT_SMALL_SCAN_THRESHOLD = 100;
+ public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 10*60*1000; /* 10 min */
+ public static final int DEFAULT_GLOBAL_INDEX_REPAIR_COUNT = DEFAULT_MUTATE_BATCH_SIZE;
+ public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
+
public static final boolean DEFAULT_PROPERTY_POLICY_PROVIDER_ENABLED = true;
@SuppressWarnings("serial")
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 712842f..2e71487 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -21,9 +21,11 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.PhoenixRuntime.getTable;
import java.io.ByteArrayInputStream;
@@ -37,6 +39,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
@@ -54,6 +57,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -75,6 +80,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
@@ -82,12 +88,16 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
+import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -826,7 +836,7 @@ public class IndexUtil {
.getIndexes().iterator()) : Collections.<PTable> emptyIterator();
return Lists.newArrayList(indexIterator);
}
-
+
public static Result incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable,long amount) throws IOException {
byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable);
Increment incr = new Increment(indexTableKey);
@@ -855,4 +865,86 @@ public class IndexUtil {
throw new IOException(e);
}
}
+
+ private static boolean containsOneOrMoreColumn(Scan scan) {
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ if (familyMap == null || familyMap.isEmpty()) {
+ return false;
+ }
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ NavigableSet<byte[]> family = entry.getValue();
+ if (family != null && !family.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
+ boolean addedEmptyColumn = false;
+ Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
+ while (iterator.hasNext()) {
+ Filter filter = iterator.next();
+ if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
+ ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
+ if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+ scan.addColumn(emptyCF, emptyCQ);
+ }
+ }
+ else if (filter instanceof ColumnProjectionFilter) {
+ ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
+ if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+ scan.addColumn(emptyCF, emptyCQ);
+ }
+ }
+ else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) {
+ ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
+ }
+ else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) {
+ scan.addColumn(emptyCF, emptyCQ);
+ addedEmptyColumn = true;
+ }
+ }
+ if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+ scan.addColumn(emptyCF, emptyCQ);
+ }
+ }
+
+ public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
+ if (table.isTransactional() || table.isImmutableRows() || table.getType() != PTableType.INDEX) {
+ return;
+ }
+ PTable indexTable = table;
+ if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
+ return;
+ }
+ String schemaName = indexTable.getParentSchemaName().getString();
+ String tableName = indexTable.getParentTableName().getString();
+ PTable dataTable;
+ try {
+ dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName));
+ } catch (TableNotFoundException e) {
+ // This index table must be being deleted. No need to set the scan attributes
+ return;
+ }
+ if (!dataTable.getIndexes().contains(indexTable)) {
+ return;
+ }
+ if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection);
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ }
+ scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes());
+ IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+ if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
+ BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+ }
+ addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+ }
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 0cb96e2..8c626df 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.query;
import static org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY;
import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -622,6 +623,7 @@ public abstract class BaseTest {
conf.setInt("hbase.catalogjanitor.interval", 5000);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
+ conf.setInt(GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB, 5);
return conf;
}