You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/02/15 20:07:01 UTC
[1/5] phoenix git commit: PHOENIX-2221 Option to make data regions
not writable when index regions are not available (Alicia Ying Shu,
James Taylor)
Repository: phoenix
Updated Branches:
refs/heads/master 6881aef0c -> 60ef7cd54
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index f5c9295..abd31c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -66,6 +66,8 @@ import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+import co.cask.tephra.TxConstants;
+
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
@@ -78,8 +80,6 @@ import com.google.common.collect.Maps;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.sun.istack.NotNull;
-import co.cask.tephra.TxConstants;
-
/**
*
* Base class for PTable implementors. Provides abstraction for
@@ -101,6 +101,7 @@ public class PTableImpl implements PTable {
private PIndexState state;
private long sequenceNumber;
private long timeStamp;
+ private long indexDisableTimestamp;
// Have MultiMap for String->PColumn (may need family qualifier)
private List<PColumn> pkColumns;
private List<PColumn> allColumns;
@@ -207,7 +208,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -216,7 +217,7 @@ public class PTableImpl implements PTable {
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -225,7 +226,7 @@ public class PTableImpl implements PTable {
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -234,7 +235,7 @@ public class PTableImpl implements PTable {
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
- table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
@@ -244,7 +245,7 @@ public class PTableImpl implements PTable {
sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency);
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -254,7 +255,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
- table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -264,7 +265,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
- table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException {
@@ -274,7 +275,7 @@ public class PTableImpl implements PTable {
table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats,
- table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -282,11 +283,13 @@ public class PTableImpl implements PTable {
List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
- IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency) throws SQLException {
+ IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
+ long indexDisableTimestamp) throws SQLException {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
- indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency);
+ indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional,
+ updateCacheFrequency,indexDisableTimestamp);
}
public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -295,12 +298,12 @@ public class PTableImpl implements PTable {
boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
- @NotNull PTableStats stats, int baseColumnCount)
+ @NotNull PTableStats stats, int baseColumnCount, long indexDisableTimestamp)
throws SQLException {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
- indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency);
+ indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, indexDisableTimestamp);
}
private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -308,13 +311,13 @@ public class PTableImpl implements PTable {
PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
- PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency) throws SQLException {
+ PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp) throws SQLException {
init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
- isTransactional, updateCacheFrequency);
+ isTransactional, updateCacheFrequency, indexDisableTimestamp);
}
-
+
@Override
public long getUpdateCacheFrequency() {
return updateCacheFrequency;
@@ -345,7 +348,7 @@ public class PTableImpl implements PTable {
PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
- IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency) throws SQLException {
+ IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp) throws SQLException {
Preconditions.checkNotNull(schemaName);
Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -363,6 +366,7 @@ public class PTableImpl implements PTable {
this.type = type;
this.state = state;
this.timeStamp = timeStamp;
+ this.indexDisableTimestamp = indexDisableTimestamp;
this.sequenceNumber = sequenceNumber;
this.pkName = pkName;
this.isImmutableRows = isImmutableRows;
@@ -860,6 +864,11 @@ public class PTableImpl implements PTable {
}
@Override
+ public long getIndexDisableTimestamp() {
+ return indexDisableTimestamp;
+ }
+
+ @Override
public PColumn getPKColumn(String name) throws ColumnNotFoundException {
List<PColumn> columns = columnsByName.get(name);
int size = columns.size();
@@ -1001,6 +1010,7 @@ public class PTableImpl implements PTable {
}
long sequenceNumber = table.getSequenceNumber();
long timeStamp = table.getTimeStamp();
+ long indexDisableTimestamp = table.getIndexDisableTimestamp();
PName pkName = null;
if (table.hasPkNameBytes()) {
pkName = PNameFactory.newName(table.getPkNameBytes().toByteArray());
@@ -1076,7 +1086,7 @@ public class PTableImpl implements PTable {
(bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
- isTransactional, updateCacheFrequency);
+ isTransactional, updateCacheFrequency, indexDisableTimestamp);
return result;
} catch (SQLException e) {
throw new RuntimeException(e); // Impossible
@@ -1167,7 +1177,8 @@ public class PTableImpl implements PTable {
builder.setBaseColumnCount(table.getBaseColumnCount());
builder.setRowKeyOrderOptimizable(table.rowKeyOrderOptimizable());
builder.setUpdateCacheFrequency(table.getUpdateCacheFrequency());
-
+ builder.setIndexDisableTimestamp(table.getIndexDisableTimestamp());
+
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 6b89187..a8757ab 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -233,7 +233,7 @@ public class CorrelatePlanTest {
PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
null, null, columns, null, null, Collections.<PTable>emptyList(),
false, Collections.<PName>emptyList(), null, null, false, false, false, null,
- null, null, true, false, 0);
+ null, null, true, false, 0, 0L);
TableRef sourceTable = new TableRef(pTable);
List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
for (PColumn column : sourceTable.getTable().getColumns()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-protocol/src/main/PTable.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto
index 8bdb57f..09bdeb6 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -89,4 +89,5 @@ message PTable {
optional bool rowKeyOrderOptimizable = 26;
optional bool transactional = 27;
optional int64 updateCacheFrequency = 28;
+ optional int64 indexDisableTimestamp = 29;
}
[5/5] phoenix git commit: PHOENIX-2684
LiteralExpression.getBooleanLiteralExpression should compare with .equals()
(Julian Eberius)
Posted by ja...@apache.org.
PHOENIX-2684 LiteralExpression.getBooleanLiteralExpression should compare with .equals() (Julian Eberius)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/60ef7cd5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/60ef7cd5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/60ef7cd5
Branch: refs/heads/master
Commit: 60ef7cd54e26fd1635e503c7d7981ba2cdf4c6fc
Parents: 43b34da
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Feb 15 09:50:43 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Feb 15 10:14:58 2016 -0800
----------------------------------------------------------------------
.../phoenix/end2end/CompareDecimalToLongIT.java | 241 ------------------
.../apache/phoenix/end2end/PrimitiveTypeIT.java | 245 +++++++++++++++++++
.../phoenix/expression/LiteralExpression.java | 2 +-
.../java/org/apache/phoenix/query/BaseTest.java | 14 +-
4 files changed, 252 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/60ef7cd5/phoenix-core/src/it/java/org/apache/phoenix/end2end/CompareDecimalToLongIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CompareDecimalToLongIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CompareDecimalToLongIT.java
deleted file mode 100644
index 3a358c4..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CompareDecimalToLongIT.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.util.Properties;
-
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.Test;
-
-
-public class CompareDecimalToLongIT extends BaseClientManagedTimeIT {
- protected static void initTableValues(byte[][] splits, long ts) throws Exception {
- ensureTableCreated(getUrl(),"LongInKeyTest",splits, ts-2);
-
- // Insert all rows at ts
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
- Connection conn = DriverManager.getConnection(url);
- conn.setAutoCommit(true);
- PreparedStatement stmt = conn.prepareStatement(
- "upsert into " +
- "LongInKeyTest VALUES(?)");
- stmt.setLong(1, 2);
- stmt.execute();
- conn.close();
- }
-
- @Test
- public void testCompareLongGTDecimal() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l > 1.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- assertTrue (rs.next());
- assertEquals(2, rs.getLong(1));
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testCompareLongGTEDecimal() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l >= 1.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- /*
- * Failing because we're not converting the constant to the type of the RHS
- * when forming the start/stop key.
- * For this case, 1.5 -> 1L
- * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
- *
- */
- assertTrue (rs.next());
- assertEquals(2, rs.getLong(1));
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testCompareLongLTDecimal() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l < 1.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- /*
- * Failing because we're not converting the constant to the type of the RHS
- * when forming the start/stop key.
- * For this case, 1.5 -> 1L
- * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
- *
- */
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testCompareLongLTEDecimal() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l <= 1.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- /*
- * Failing because we're not converting the constant to the type of the RHS
- * when forming the start/stop key.
- * For this case, 1.5 -> 1L
- * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
- *
- */
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
- @Test
- public void testCompareLongGTDecimal2() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l > 2.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- /*
- * Failing because we're not converting the constant to the type of the RHS
- * when forming the start/stop key.
- * For this case, 1.5 -> 1L
- * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
- *
- */
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testCompareLongGTEDecimal2() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l >= 2.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- /*
- * Failing because we're not converting the constant to the type of the RHS
- * when forming the start/stop key.
- * For this case, 1.5 -> 1L
- * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
- *
- */
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testCompareLongLTDecimal2() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l < 2.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- /*
- * Failing because we're not converting the constant to the type of the RHS
- * when forming the start/stop key.
- * For this case, 1.5 -> 1L
- * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
- *
- */
- assertTrue (rs.next());
- assertEquals(2, rs.getLong(1));
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testCompareLongLTEDecimal2() throws Exception {
- long ts = nextTimestamp();
- initTableValues(null, ts);
- String query = "SELECT l FROM LongInKeyTest where l <= 2.5";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
- try {
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- /*
- * Failing because we're not converting the constant to the type of the RHS
- * when forming the start/stop key.
- * For this case, 1.5 -> 1L
- * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
- *
- */
- assertTrue (rs.next());
- assertEquals(2, rs.getLong(1));
- assertFalse(rs.next());
- } finally {
- conn.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/60ef7cd5/phoenix-core/src/it/java/org/apache/phoenix/end2end/PrimitiveTypeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PrimitiveTypeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PrimitiveTypeIT.java
new file mode 100644
index 0000000..cc92ea9
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PrimitiveTypeIT.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+
+public class PrimitiveTypeIT extends BaseHBaseManagedTimeIT {
+ private static void initTableValues(Connection conn) throws Exception {
+ conn.createStatement().execute("create table T (l bigint not null primary key, b boolean)");
+ PreparedStatement stmt = conn.prepareStatement(
+ "upsert into " +
+ "T VALUES(?)");
+ stmt.setLong(1, 2);
+ stmt.execute();
+ conn.commit();
+ }
+
+ @Test
+ public void testCompareLongGTDecimal() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l > 1.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCompareLongGTEDecimal() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l >= 1.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ /*
+ * Failing because we're not converting the constant to the type of the RHS
+ * when forming the start/stop key.
+ * For this case, 1.5 -> 1L
+ * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
+ *
+ */
+ assertTrue (rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCompareLongLTDecimal() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l < 1.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ /*
+ * Failing because we're not converting the constant to the type of the RHS
+ * when forming the start/stop key.
+ * For this case, 1.5 -> 1L
+ * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
+ *
+ */
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCompareLongLTEDecimal() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l <= 1.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ /*
+ * Failing because we're not converting the constant to the type of the RHS
+ * when forming the start/stop key.
+ * For this case, 1.5 -> 1L
+ * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
+ *
+ */
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+ @Test
+ public void testCompareLongGTDecimal2() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l > 2.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ /*
+ * Failing because we're not converting the constant to the type of the RHS
+ * when forming the start/stop key.
+ * For this case, 1.5 -> 1L
+ * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
+ *
+ */
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCompareLongGTEDecimal2() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l >= 2.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ /*
+ * Failing because we're not converting the constant to the type of the RHS
+ * when forming the start/stop key.
+ * For this case, 1.5 -> 1L
+ * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
+ *
+ */
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCompareLongLTDecimal2() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l < 2.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ /*
+ * Failing because we're not converting the constant to the type of the RHS
+ * when forming the start/stop key.
+ * For this case, 1.5 -> 1L
+ * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
+ *
+ */
+ assertTrue (rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testCompareLongLTEDecimal2() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "SELECT l FROM T where l <= 2.5";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ /*
+ * Failing because we're not converting the constant to the type of the RHS
+ * when forming the start/stop key.
+ * For this case, 1.5 -> 1L
+ * if where l < 1.5 then 1.5 -> 1L and then to 2L because it's not inclusive
+ *
+ */
+ assertTrue (rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testBooleanAsObject() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ initTableValues(conn);
+ String query = "upsert into T values (2, ?)";
+ try {
+ PreparedStatement statement = conn.prepareStatement(query);
+ statement.setObject(1, new Boolean("false"));
+ statement.execute();
+ conn.commit();
+ statement = conn.prepareStatement("SELECT l,b,? FROM T");
+ statement.setObject(1, new Boolean("false"));
+ ResultSet rs = statement.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertEquals(Boolean.FALSE, rs.getObject(2));
+ assertEquals(Boolean.FALSE, rs.getObject(3));
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/60ef7cd5/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java
index e911aae..ad1c7c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java
@@ -85,7 +85,7 @@ public class LiteralExpression extends BaseTerminalExpression {
}
private static LiteralExpression getBooleanLiteralExpression(Boolean bool, Determinism determinism){
- return BOOLEAN_EXPRESSIONS[ (bool==Boolean.FALSE ? 0 : Determinism.values().length) + determinism.ordinal()];
+ return BOOLEAN_EXPRESSIONS[ (Boolean.FALSE.equals(bool) ? 0 : Determinism.values().length) + determinism.ordinal()];
}
public static boolean isFalse(Expression child) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/60ef7cd5/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index a67a530..9539e69 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -172,18 +172,18 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.util.Providers;
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.TransactionService;
+import co.cask.tephra.metrics.TxMetricsCollector;
+import co.cask.tephra.persist.InMemoryTransactionStateStorage;
+
/**
*
* Base class that contains all the methods needed by
@@ -429,8 +429,6 @@ public abstract class BaseTest {
" (i integer not null primary key)");
builder.put("IntIntKeyTest","create table IntIntKeyTest" +
" (i integer not null primary key, j integer)");
- builder.put("LongInKeyTest","create table LongInKeyTest" +
- " (l bigint not null primary key)");
builder.put("PKIntValueTest", "create table PKIntValueTest" +
" (pk integer not null primary key)");
builder.put("PKBigIntValueTest", "create table PKBigIntValueTest" +
[3/5] phoenix git commit: PHOENIX-2635 Partial index rebuild doesn't
work for mutable data
Posted by ja...@apache.org.
PHOENIX-2635 Partial index rebuild doesn't work for mutable data
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/046bda34
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/046bda34
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/046bda34
Branch: refs/heads/master
Commit: 046bda34771aaec3befd4ad17024afc5af9b83ed
Parents: e2a6386
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Feb 15 00:33:05 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Feb 15 10:14:54 2016 -0800
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 379 +++++++------------
.../end2end/index/ReadOnlyIndexFailureIT.java | 75 ++--
.../EndToEndCoveredColumnsIndexBuilderIT.java | 2 +-
.../coprocessor/BaseScannerRegionObserver.java | 5 +-
.../coprocessor/MetaDataRegionObserver.java | 120 +++++-
.../hbase/index/covered/LocalTableState.java | 19 +-
.../phoenix/hbase/index/covered/TableState.java | 7 +-
.../index/covered/data/LocalHBaseState.java | 6 +-
.../hbase/index/covered/data/LocalTable.java | 9 +-
.../example/CoveredColumnIndexCodec.java | 4 +-
.../hbase/index/scanner/ScannerBuilder.java | 1 -
.../apache/phoenix/index/IndexMaintainer.java | 4 +-
.../apache/phoenix/index/PhoenixIndexCodec.java | 12 +-
.../phoenix/index/PhoenixIndexMetaData.java | 10 +-
.../index/PhoenixTransactionalIndexer.java | 2 +-
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 32 +-
.../apache/phoenix/parse/NamedTableNode.java | 8 +
.../phoenix/query/QueryServicesOptions.java | 2 +-
.../apache/phoenix/schema/MetaDataClient.java | 34 +-
.../org/apache/phoenix/util/PhoenixRuntime.java | 8 +-
.../index/covered/TestLocalTableState.java | 10 +-
.../example/TestCoveredColumnIndexCodec.java | 4 +-
22 files changed, 368 insertions(+), 385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 176c5a0..ebc6988 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -30,24 +30,17 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseCluster;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.query.QueryServices;
@@ -61,7 +54,6 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -75,28 +67,29 @@ import com.google.common.collect.Maps;
* For some reason dropping tables after running this test
* fails unless it runs its own mini cluster.
*
- *
- * @since 2.1
*/
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
- private Timer scheduleTimer;
-
+ public static volatile boolean FAIL_WRITE = false;
+ public static final String INDEX_NAME = "IDX";
+
private String tableName;
private String indexName;
private String fullTableName;
private String fullIndexName;
- private boolean transactional;
+ private final boolean transactional;
+ private final boolean localIndex;
private final String tableDDLOptions;
- public MutableIndexFailureIT(boolean transactional) {
+ public MutableIndexFailureIT(boolean transactional, boolean localIndex) {
this.transactional = transactional;
+ this.localIndex = localIndex;
this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
- this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "");
- this.indexName = "IDX";
+ this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "");
+ this.indexName = INDEX_NAME;
this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
}
@@ -104,31 +97,28 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
- serverProps.put("hbase.client.retries.number", "2");
+ serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
+ serverProps.put("data.tx.snapshot.dir", "/tmp");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
- @Parameters(name = "transactional = {0}")
+ @Parameters(name = "transactional = {0}, localIndex = {1}")
public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] { { false }, { true } });
- }
-
- @Test
- public void testWriteFailureDisablesLocalIndex() throws Exception {
- helpTestWriteFailureDisablesIndex(true);
+ return Arrays.asList(new Boolean[][] { { false, false }, { false, true }, { true, false }, { true, true } });
}
@Test
public void testWriteFailureDisablesIndex() throws Exception {
- helpTestWriteFailureDisablesIndex(false);
+ helpTestWriteFailureDisablesIndex();
}
- public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception {
+ public void helpTestWriteFailureDisablesIndex() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = driver.connect(url, props)) {
String query;
@@ -140,15 +130,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
- if(localIndex) {
- conn.createStatement().execute(
- "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
- conn.createStatement().execute(
- "CREATE LOCAL INDEX " + indexName+ "_2" + " ON " + fullTableName + " (v2) INCLUDE (v1)");
- } else {
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
- }
+ FAIL_WRITE = false;
+ conn.createStatement().execute(
+ "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
@@ -167,23 +151,50 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setString(2, "x");
stmt.setString(3, "1");
stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setString(2, "y");
+ stmt.setString(3, "2");
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setString(2, "z");
+ stmt.setString(3, "3");
+ stmt.execute();
conn.commit();
- TableName indexTable =
- TableName.valueOf(localIndex ? MetaDataUtil
- .getLocalIndexTableName(fullTableName) : fullIndexName);
- HBaseAdmin admin = getUtility().getHBaseAdmin();
- HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
- try{
- admin.disableTable(indexTable);
- admin.deleteTable(indexTable);
- } catch (TableNotFoundException ignore) {}
+ query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ String expectedPlan =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
+ assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals("y", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
+
+ FAIL_WRITE = true;
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a2");
+ // Insert new row
+ stmt.setString(1, "d");
+ stmt.setString(2, "d");
+ stmt.setString(3, "4");
+ stmt.execute();
+ // Update existing row
+ stmt.setString(1, "a");
stmt.setString(2, "x2");
stmt.setString(3, "2");
stmt.execute();
+ // Delete existing row
+ stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
+ stmt.setString(1, "b");
+ stmt.execute();
try {
conn.commit();
fail();
@@ -196,20 +207,17 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is only disabled for non-txn tables upon index table write failure
- PIndexState indexState = transactional ? PIndexState.ACTIVE : PIndexState.DISABLE;
- assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
- if(localIndex) {
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName + "_2", rs.getString(3));
- assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
+ if (transactional) {
+ assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ } else {
+ String indexState = rs.getString("INDEX_STATE");
+ assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
}
+ assertFalse(rs.next());
- // if the table is transactional the write to the index table will fail because the
- // index has not been disabled
+ // If the table is transactional the write to both the data and index table will fail
+ // in an all or none manner. If the table is not transactional, then the data writes
+ // would have succeeded while the index writes would have failed.
if (!transactional) {
// Verify UPSERT on data table still work after index is disabled
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
@@ -218,210 +226,101 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setString(3, "3");
stmt.execute();
conn.commit();
- }
- if (transactional) {
- // if the table was transactional there should be 1 row (written before the index
- // was disabled)
- query = "SELECT /*+ NO_INDEX */ v2 FROM " + fullTableName;
+ // Verify previous writes succeeded to data table
+ query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- String expectedPlan =
+ expectedPlan =
"CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
- assertEquals("1", rs.getString(1));
- assertFalse(rs.next());
- } else {
- // if the table was not transactional there should be three rows (all writes to data
- // table should succeed)
- query = "SELECT v2 FROM " + fullTableName;
- rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- String expectedPlan =
- "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
- assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
- rs = conn.createStatement().executeQuery(query);
+ assertEquals("a", rs.getString(1));
+ assertEquals("x2", rs.getString(2));
assertTrue(rs.next());
- assertEquals("1", rs.getString(1));
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x3", rs.getString(2));
assertTrue(rs.next());
- assertEquals("2", rs.getString(1));
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
assertTrue(rs.next());
- assertEquals("3", rs.getString(1));
+ assertEquals("d", rs.getString(1));
+ assertEquals("d", rs.getString(2));
assertFalse(rs.next());
}
- // recreate index table
- admin.createTable(indexTableDesc);
- do {
- Thread.sleep(15 * 1000); // sleep 15 secs
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
- break;
- }
- if(localIndex) {
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
+ // re-enable index table
+ FAIL_WRITE = false;
+
+ boolean isActive = false;
+ if (!transactional) {
+ int maxTries = 3, nTries = 0;
+ do {
+ Thread.sleep(15 * 1000); // sleep 15 secs
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+ isActive = true;
break;
}
- }
- } while(true);
+ } while(++nTries < maxTries);
+ assertTrue(isActive);
+ }
// Verify UPSERT on data table still work after index table is recreated
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a4");
+ stmt.setString(1, "a3");
stmt.setString(2, "x4");
stmt.setString(3, "4");
stmt.execute();
conn.commit();
- // verify index table has data
- query = "SELECT count(1) FROM " + fullIndexName;
+ // verify index table has correct data
+ query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ expectedPlan =
+ " OVER " + (localIndex ? MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + tableName : fullIndexName);
+ String explainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(explainPlan.contains(expectedPlan));
rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
-
- // for txn tables there will be only one row in the index (a4)
- // for non txn tables there will be three rows because we only partially build index
- // from where we failed and the oldest
- // index row has been deleted when we dropped the index table during test
- assertEquals(transactional ? 1 : 3, rs.getInt(1));
- }
- }
-
-
- @Ignore("See PHOENIX-2332")
- @Test
- public void testWriteFailureWithRegionServerDown() throws Exception {
- String query;
- ResultSet rs;
-
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = driver.connect(url, props);) {
- conn.setAutoCommit(false);
- conn.createStatement().execute(
- "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions);
- query = "SELECT * FROM " + fullTableName;
- rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
-
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
- query = "SELECT * FROM " + fullIndexName;
- rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
-
- // Verify the metadata for index is correct.
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName, rs.getString(3));
- assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
-
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a");
- stmt.setString(2, "x");
- stmt.setString(3, "1");
- stmt.execute();
- conn.commit();
-
- // find a RS which doesn't has CATALOG table
- TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
- TableName indexTable = TableName.valueOf(fullIndexName);
- final HBaseCluster cluster = getUtility().getHBaseCluster();
- Collection<ServerName> rss = cluster.getClusterStatus().getServers();
- HBaseAdmin admin = getUtility().getHBaseAdmin();
- List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
- ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(),
- regions.get(0).getRegionName());
- ServerName metaRS = cluster.getServerHoldingMeta();
- ServerName rsToBeKilled = null;
-
- // find first RS isn't holding META or CATALOG table
- for(ServerName curRS : rss) {
- if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
- rsToBeKilled = curRS;
- break;
- }
- }
- assertTrue(rsToBeKilled != null);
-
- regions = admin.getTableRegions(indexTable);
- final HRegionInfo indexRegion = regions.get(0);
- final ServerName dstRS = rsToBeKilled;
- admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
- getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() {
- @Override
- public boolean evaluate() throws Exception {
- ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(),
- indexRegion.getRegionName());
- return (sn != null && sn.equals(dstRS));
- }
- });
-
- // use timer sending updates in every 10ms
- this.scheduleTimer = new Timer(true);
- this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn, fullTableName), 0, 10);
- // let timer sending some updates
- Thread.sleep(100);
-
- // kill RS hosting index table
- getUtility().getHBaseCluster().killRegionServer(rsToBeKilled);
-
- // wait for index table completes recovery
- getUtility().waitUntilAllRegionsAssigned(indexTable);
-
- // Verify the metadata for index is correct.
- do {
- Thread.sleep(15 * 1000); // sleep 15 secs
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
+ if (transactional) { // failed commit does not get retried
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x4", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals("y", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
+ } else { // failed commit eventually succeeds
+ assertTrue(rs.next());
+ assertEquals("d", rs.getString(1));
+ assertEquals("d", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x2", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x4", rs.getString(2));
assertTrue(rs.next());
- if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
- break;
- }
- } while(true);
- this.scheduleTimer.cancel();
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
+ }
}
}
-
- static class SendingUpdatesScheduleTask extends TimerTask {
- private static final Log LOG = LogFactory.getLog(SendingUpdatesScheduleTask.class);
-
- // inProgress is to prevent timer from invoking a new task while previous one is still
- // running
- private final static AtomicInteger inProgress = new AtomicInteger(0);
- private final Connection conn;
- private final String fullTableName;
- private int inserts = 0;
-
- public SendingUpdatesScheduleTask(Connection conn, String fullTableName) {
- this.conn = conn;
- this.fullTableName = fullTableName;
- }
-
+
+ public static class FailingRegionObserver extends SimpleRegionObserver {
@Override
- public void run() {
- if(inProgress.get() > 0){
- return;
- }
-
- try {
- inProgress.incrementAndGet();
- inserts++;
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a" + inserts);
- stmt.setString(2, "x" + inserts);
- stmt.setString(3, String.valueOf(inserts));
- stmt.execute();
- conn.commit();
- } catch (Throwable t) {
- LOG.warn("ScheduledBuildIndexTask failed!", t);
- } finally {
- inProgress.decrementAndGet();
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
index 8df82ce..931fcae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
@@ -27,19 +27,20 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -56,6 +57,9 @@ import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps;
/**
@@ -69,28 +73,37 @@ import com.google.common.collect.Maps;
*/
@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
- private static final String FAIL_ON_FIRST_PUT = "bbb";
+ public static volatile boolean FAIL_WRITE = false;
+ public static final String INDEX_NAME = "IDX";
private String tableName;
private String indexName;
private String fullTableName;
private String fullIndexName;
+ private final boolean localIndex;
- public ReadOnlyIndexFailureIT() {
- this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
- this.indexName = "IDX";
+ public ReadOnlyIndexFailureIT(boolean localIndex) {
+ this.localIndex = localIndex;
+ this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME;
+ this.indexName = INDEX_NAME;
this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
}
+ @Parameters(name = "localIndex = {0}")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] { { false }, { true } });
+ }
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
- serverProps.put("hbase.client.retries.number", "2");
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
@@ -105,16 +118,11 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
}
@Test
- public void testWriteFailureReadOnlyLocalIndex() throws Exception {
- helpTestWriteFailureReadOnlyIndex(true);
- }
-
- @Test
public void testWriteFailureReadOnlyIndex() throws Exception {
- helpTestWriteFailureReadOnlyIndex(false);
+ helpTestWriteFailureReadOnlyIndex();
}
- public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Exception {
+ public void helpTestWriteFailureReadOnlyIndex() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = driver.connect(url, props)) {
String query;
@@ -126,6 +134,7 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
+ FAIL_WRITE = false;
if(localIndex) {
conn.createStatement().execute(
"CREATE LOCAL INDEX " + indexName + " ON " + fullTableName
@@ -157,9 +166,10 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.execute();
conn.commit();
+ FAIL_WRITE = true;
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "2");
- stmt.setString(2, FAIL_ON_FIRST_PUT);
+ stmt.setString(2, "bbb");
stmt.setString(3, "b2");
stmt.execute();
try {
@@ -201,6 +211,7 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
}
+ FAIL_WRITE = false;
// Second attempt at writing will succeed
int retries = 0;
do {
@@ -222,12 +233,12 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
conn.commit();
// verify index table has data
- query = "SELECT count(1) FROM " + indexName;
+ query = "SELECT count(1) FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
- query = "SELECT v1 FROM " + fullTableName;
+ query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("aaa", rs.getString(1));
@@ -261,29 +272,13 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
return (!rs.wasNull() && ts > 0);
}
+
public static class FailingRegionObserver extends SimpleRegionObserver {
- private Integer failCount = new Integer(0);
-
@Override
- public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
- final Durability durability) throws HBaseIOException {
- if (shouldFailUpsert(c, put)) {
- synchronized (failCount) {
- failCount++;
- if (failCount.intValue() == 1) {
- // throwing anything other than instances of IOException result
- // in this coprocessor being unloaded
- // DoNotRetryIOException tells HBase not to retry this mutation
- // multiple times
- throw new DoNotRetryIOException();
- }
- }
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
}
}
-
- private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
- return Bytes.contains(put.getRow(), Bytes.toBytes(FAIL_ON_FIRST_PUT));
- }
-
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 2396719..fe2f1b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -148,7 +148,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
public void verify(TableState state) {
try {
Scanner kvs =
- ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
+ ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns), false).getFirst();
int count = 0;
Cell kv;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9487b36..a5533af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -58,10 +58,10 @@ import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
-import co.cask.tephra.Transaction;
-
import com.google.common.collect.ImmutableList;
+import co.cask.tephra.Transaction;
+
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
public static final String TX_SCN = "_TxScn";
public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
+ public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";
/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 4e019cd..0cce4d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.TimerTask;
@@ -31,37 +32,53 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.UpgradeUtil;
+import com.google.common.collect.Lists;
+
/**
* Coprocessor for metadata related operations. This coprocessor would only be registered
@@ -190,8 +207,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
// separately, all updating the same data.
RegionScanner scanner = null;
PhoenixConnection conn = null;
- boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
- QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
if (inProgress.get() > 0) {
LOG.debug("New ScheduledBuildIndexTask skipped as there is already one running");
return;
@@ -213,9 +228,13 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+ PTable dataPTable = null;
+ MetaDataClient client = null;
boolean hasMore = false;
List<Cell> results = new ArrayList<Cell>();
+ List<PTable> indexesToPartiallyRebuild = Collections.emptyList();
scanner = this.env.getRegion().getScanner(scan);
+ long earliestDisableTimestamp = Long.MAX_VALUE;
do {
results.clear();
@@ -226,16 +245,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
- Long disabledTimeStampVal = 0L;
if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
continue;
}
// disableTimeStamp has to be a positive value
- disabledTimeStampVal = (Long) PLong.INSTANCE.toObject(disabledTimeStamp);
+ long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, SortOrder.getDefault());
if (disabledTimeStampVal <= 0) {
continue;
}
+ if (disabledTimeStampVal < earliestDisableTimestamp) {
+ earliestDisableTimestamp = disabledTimeStampVal;
+ }
byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
@@ -247,12 +268,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
continue;
}
- if (!blockWriteRebuildIndex && ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
- && (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) {
- // index has to be either in disable or inactive state
- continue;
- }
-
byte[][] rowKeyMetaData = new byte[3][];
SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData);
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
@@ -266,34 +281,101 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
if (conn == null) {
final Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString());
// Set SCN so that we don't ping server and have the upper bound set back to
// the timestamp when the failure occurred.
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE));
// don't run a second index populations upsert select
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class);
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
+ dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+ indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
+ client = new MetaDataClient(conn);
}
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
- PTable dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
LOG.debug("Index rebuild has been skipped because not all regions of index table="
+ indexPTable.getName() + " are online.");
continue;
}
+ // Allow index to begin incremental maintenance as index is back online and we
+ // cannot transition directly from DISABLED -> ACTIVE
+ if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) == 0) {
+ AlterIndexStatement statement = new AlterIndexStatement(
+ NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
+ dataPTable.getTableName().getString(),
+ false, PIndexState.INACTIVE);
+ client.alterIndex(statement);
+ }
+ indexesToPartiallyRebuild.add(indexPTable);
+ } while (hasMore);
- MetaDataClient client = new MetaDataClient(conn);
+ if (!indexesToPartiallyRebuild.isEmpty()) {
long overlapTime = env.getConfiguration().getLong(
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
- long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime);
+ long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
- LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp);
- client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp), blockWriteRebuildIndex);
-
- } while (hasMore);
+ LOG.info("Starting to build indexes=" + indexesToPartiallyRebuild + " from timestamp=" + timeStamp);
+ Scan dataTableScan = new Scan();
+ dataTableScan.setRaw(true);
+ dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
+ byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
+ try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) {
+ Result result;
+ try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) {
+ int batchSize = conn.getMutateBatchSize();
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, conn);
+ byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ byte[] uuidValue = ServerCacheClient.generateId();
+
+ while ((result = dataTableScanner.next()) != null && !result.isEmpty()) {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : result.rawCells()) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new Delete(CellUtil.cloneRow(cell));
+ del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+ mutations.add(del);
+ }
+ del.addDeleteMarker(cell);
+ }
+ }
+ if (mutations.size() == batchSize) {
+ dataHTable.batch(mutations);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ }
+ if (!mutations.isEmpty()) {
+ dataHTable.batch(mutations);
+ }
+ }
+ }
+ for (PTable indexPTable : indexesToPartiallyRebuild) {
+ AlterIndexStatement statement = new AlterIndexStatement(
+ NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
+ dataPTable.getTableName().getString(),
+ false, PIndexState.ACTIVE);
+ client.alterIndex(statement);
+ }
+ }
} catch (Throwable t) {
LOG.warn("ScheduledBuildIndexTask failed!", t);
} finally {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index c4ed7a0..2739cc2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -36,8 +36,6 @@ import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import com.google.inject.Key;
-
/**
* Manage the state of the HRegion's view of the table, for the single row.
* <p>
@@ -108,7 +106,7 @@ public class LocalTableState implements TableState {
public void setCurrentTimestamp(long timestamp) {
this.ts = timestamp;
}
-
+
public void resetTrackedColumns() {
this.trackedColumns.clear();
}
@@ -139,6 +137,9 @@ public class LocalTableState implements TableState {
* request - you will never see a column with the timestamp we are tracking, but the next oldest
* timestamp for that column.
* @param indexedColumns the columns to that will be indexed
+ * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+ * when replaying mutation state for partial index rebuild where writes succeeded to the data
+ * table, but not to the index table.
* @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
* the builder. Even if no update is necessary for the requested columns, you still need
* to return the {@link IndexUpdate}, just don't set the update for the
@@ -146,8 +147,8 @@ public class LocalTableState implements TableState {
* @throws IOException
*/
public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
- Collection<? extends ColumnReference> indexedColumns) throws IOException {
- ensureLocalStateInitialized(indexedColumns);
+ Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations) throws IOException {
+ ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations);
// filter out things with a newer timestamp and track the column references to which it applies
ColumnTracker tracker = new ColumnTracker(indexedColumns);
synchronized (this.trackedColumns) {
@@ -167,7 +168,7 @@ public class LocalTableState implements TableState {
* {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
* then, there is still fairly low contention as each new Put/Delete will have its own table state.
*/
- private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns)
+ private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
throws IOException {
// check to see if we haven't initialized any columns yet
Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
@@ -175,7 +176,7 @@ public class LocalTableState implements TableState {
if (toCover.isEmpty()) { return; }
// add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
+ this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
// add the covered columns to the set
for (ColumnReference ref : toCover) {
@@ -268,9 +269,9 @@ public class LocalTableState implements TableState {
}
@Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations)
throws IOException {
- Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns);
+ Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations);
ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 0e961db..bd4bdfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -58,9 +57,13 @@ public interface TableState {
/**
* Get a getter interface for the state of the index row
+ * @param indexedColumns list of indexed columns.
+ * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+ * when replaying mutation state for partial index rebuild where writes succeeded to the data
+ * table, but not to the index table.
*/
Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
- Collection<? extends ColumnReference> indexedColumns) throws IOException;
+ Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations) throws IOException;
/**
* @return the row key for the current row for which we are building an index update.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index 6d20c18..9968627 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
/**
@@ -35,13 +34,16 @@ public interface LocalHBaseState {
* @param m mutation for which we should get the current table state
* @param toCover all the columns the current row state needs to cover; hint the underlying lookup
* to save getting all the columns for the row
+ * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+ * when replaying mutation state for partial index rebuild where writes succeeded to the data
+ * table, but not to the index table.
* @return the full state of the given row. Includes all current versions (even if they are not
* usually visible to the client (unless they are also doing a raw scan)). Never returns a
* <tt>null</tt> {@link Result} - instead, when there is not data for the row, returns a
* {@link Result} with no stored {@link KeyValue}s.
* @throws IOException if there is an issue reading the row
*/
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
index 549fe8c..003df2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -52,13 +51,19 @@ public class LocalTable implements LocalHBaseState {
}
@Override
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
throws IOException {
byte[] row = m.getRow();
// need to use a scan here so we can get raw state, which Get doesn't provide.
Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
s.setStartRow(row);
s.setStopRow(row);
+ if (ignoreNewerMutations) {
+ // Provides a means of client indicating that newer cells should not be considered,
+ // enabling mutations to be replayed to partially rebuild the index when a write fails.
+ long ts = m.getFamilyCellMap().firstEntry().getValue().get(0).getTimestamp();
+ s.setTimeRange(0,ts);
+ }
Region region = this.env.getRegion();
RegionScanner scanner = region.getScanner(s);
List<Cell> kvs = new ArrayList<Cell>(1);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 4efca9f..0f960e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -77,7 +77,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
List<CoveredColumn> refs = group.getColumns();
try {
- Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+ Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false);
Scanner kvs = stateInfo.getFirst();
Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey());
// make sure we close the scanner
@@ -132,7 +132,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
List<CoveredColumn> refs = group.getColumns();
try {
- Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+ Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false);
Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
// make sure we close the scanner reference
kvs.getFirst().close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index e120268..b4282ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -59,7 +59,6 @@ public class ScannerBuilder {
public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
- // TODO: This needs to use some form of the filter that Tephra has when transactional
Filter columnFilters = getColumnFilters(indexedColumns);
FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 4d545a2..13ad7e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -89,14 +89,14 @@ import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-import co.cask.tephra.TxConstants;
-
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import co.cask.tephra.TxConstants;
+
/**
*
* Class that builds index row key from data row key and current state of
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 7acc90c..8ad4d3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -15,8 +15,8 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
@@ -59,7 +59,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
- List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+ PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
+ List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers();
if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
return Collections.emptyList();
}
@@ -67,7 +68,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
ptr.set(state.getCurrentRowKey());
List<IndexUpdate> indexUpdates = Lists.newArrayList();
for (IndexMaintainer maintainer : indexMaintainers) {
- Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+ Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
indexUpdate.setTable(maintainer.getIndexTableName());
@@ -81,7 +82,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
- List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+ PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
+ List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ptr.set(state.getCurrentRowKey());
List<IndexUpdate> indexUpdates = Lists.newArrayList();
@@ -90,7 +92,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
// to aid in rollback if there's a KeyValue column in the index. The alternative would be
// to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
// client side.
- Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+ Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
indexUpdate.setTable(maintainer.getIndexTableName());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 60ae915..4fab674 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -22,8 +22,6 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
-import co.cask.tephra.Transaction;
-
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.cache.GlobalCache;
@@ -39,9 +37,12 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
+import co.cask.tephra.Transaction;
+
public class PhoenixIndexMetaData implements IndexMetaData {
private final Map<String, byte[]> attributes;
private final IndexMetaDataCache indexMetaDataCache;
+ private final boolean ignoreNewerMutations;
private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
@@ -87,6 +88,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
this.indexMetaDataCache = getIndexMetaData(env, attributes);
this.attributes = attributes;
+ this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
}
public Transaction getTransaction() {
@@ -100,4 +102,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public Map<String, byte[]> getAttributes() {
return attributes;
}
+
+ public boolean ignoreNewerMutations() {
+ return ignoreNewerMutations;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 26f9725..e4c106e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -486,7 +486,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
@Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations)
throws IOException {
// TODO: creating these objects over and over again is wasteful
ColumnTracker tracker = new ColumnTracker(indexedColumns);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 22b02c5..2c33d21 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.query.HBaseFactoryProvider;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesImpl;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -224,20 +225,23 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
connectionQueryServices = prevValue;
}
}
- boolean success = false;
- SQLException sqlE = null;
- try {
- connectionQueryServices.init(url, info);
- success = true;
- } catch (SQLException e) {
- sqlE = e;
- }
- finally {
- if (!success) {
- // Remove from map, as initialization failed
- connectionQueryServicesMap.remove(normalizedConnInfo);
- if (sqlE != null) {
- throw sqlE;
+ String noUpgradeProp = info.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB);
+ if (!Boolean.TRUE.equals(noUpgradeProp)) {
+ boolean success = false;
+ SQLException sqlE = null;
+ try {
+ connectionQueryServices.init(url, info);
+ success = true;
+ } catch (SQLException e) {
+ sqlE = e;
+ }
+ finally {
+ if (!success) {
+ // Remove from map, as initialization failed
+ connectionQueryServicesMap.remove(normalizedConnInfo);
+ if (sqlE != null) {
+ throw sqlE;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
index 4e0906f..d3b4505 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
@@ -39,6 +39,14 @@ public class NamedTableNode extends ConcreteTableNode {
return new NamedTableNode(alias, name, dynColumns);
}
+ public static NamedTableNode create (TableName name) {
+ return new NamedTableNode(null, name, Collections.<ColumnDef>emptyList());
+ }
+
+ public static NamedTableNode create (String schemaName, String tableName) {
+ return new NamedTableNode(null, TableName.create(schemaName, tableName), Collections.<ColumnDef>emptyList());
+ }
+
NamedTableNode(String alias, TableName name) {
super(alias, name);
dynColumns = Collections.<ColumnDef> emptyList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 62297ee..27c5693 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -154,7 +154,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on
public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false;
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs
- public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins
+ public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms
/**
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 6409dcd..7f3f850 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -204,8 +204,6 @@ import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import co.cask.tephra.TxConstants;
-
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
@@ -214,6 +212,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
+import co.cask.tephra.TxConstants;
+
public class MetaDataClient {
private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class);
@@ -1096,36 +1096,6 @@ public class MetaDataClient {
}
/**
- * Rebuild indexes from a timestamp which is the value from hbase row key timestamp field
- */
- public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef, boolean blockWriteRebuildIndex) throws SQLException {
- boolean needRestoreIndexState = true;
- AlterIndexStatement indexStatement = null;
- if (!blockWriteRebuildIndex) {
- // Need to change index state from Disable to InActive when build index partially so that
- // new changes will be indexed during index rebuilding
- indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE);
- alterIndex(indexStatement);
- }
- try {
- buildIndex(index, dataTableRef);
- needRestoreIndexState = false;
- } finally {
- if(needRestoreIndexState) {
- if (!blockWriteRebuildIndex) {
- // reset index state to disable
- indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE);
- alterIndex(indexStatement);
- }
- }
- }
- }
-
- /**
* Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
* MetaDataClient.createTable. In doing so, we perform the following translations:
* 1) Change the type of any columns being indexed to types that support null if the column is nullable.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index bebdd8c..05ba6d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -130,6 +130,11 @@ public class PhoenixRuntime {
public static final String TENANT_ID_ATTRIB = "TenantId";
/**
+ * Use this connection property to prevent an upgrade from occurring when
+ * connecting to a new server version.
+ */
+ public static final String NO_UPGRADE_ATTRIB = "NoUpgrade";
+ /**
* Use this connection property to control the number of rows that are
* batched together on an UPSERT INTO table1... SELECT ... FROM table2.
* It's only used when autoCommit is true and your source table is
@@ -163,7 +168,8 @@ public class PhoenixRuntime {
UPSERT_BATCH_SIZE_ATTRIB,
AUTO_COMMIT_ATTRIB,
CONSISTENCY_ATTRIB,
- REQUEST_METRIC_ATTRIB
+ REQUEST_METRIC_ATTRIB,
+ NO_UPGRADE_ATTRIB
};
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index fa8bd85..a2e45af 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -91,7 +91,7 @@ public class TestLocalTableState {
ColumnReference col = new ColumnReference(fam, qual);
table.setCurrentTimestamp(ts);
//check that our value still shows up first on scan, even though this is a lazy load
- Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
Scanner s = p.getFirst();
assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next());
}
@@ -135,13 +135,13 @@ public class TestLocalTableState {
ColumnReference col = new ColumnReference(fam, qual);
table.setCurrentTimestamp(ts);
// check that the value is there
- Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
Scanner s = p.getFirst();
assertEquals("Didn't get the pending mutation's value first", kv, s.next());
// rollback that value
table.rollback(Arrays.asList(kv));
- p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
s = p.getFirst();
assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
Mockito.verify(env, Mockito.times(1)).getRegion();
@@ -179,14 +179,14 @@ public class TestLocalTableState {
ColumnReference col = new ColumnReference(fam, qual);
table.setCurrentTimestamp(ts);
// check that the value is there
- Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
Scanner s = p.getFirst();
// make sure it read the table the one time
assertEquals("Didn't get the stored keyvalue!", storedKv, s.next());
// on the second lookup it shouldn't access the underlying table again - the cached columns
// should know they are done
- p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
s = p.getFirst();
assertEquals("Lost already loaded update!", storedKv, s.next());
Mockito.verify(env, Mockito.times(1)).getRegion();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index fc3a976..b8fa72d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
@@ -138,7 +138,7 @@ public class TestCoveredColumnIndexCodec {
}
@Override
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
throws IOException {
return r;
}
[2/5] phoenix git commit: PHOENIX-2221 Option to make data regions
not writable when index regions are not available (Alicia Ying Shu,
James Taylor)
Posted by ja...@apache.org.
PHOENIX-2221 Option to make data regions not writable when index regions are not available (Alicia Ying Shu, James Taylor)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2a6386f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2a6386f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2a6386f
Branch: refs/heads/master
Commit: e2a6386f3b9343aec74c5f96f0e0124e80b9f8b1
Parents: 6881aef
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Feb 14 09:06:14 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Feb 15 00:33:18 2016 -0800
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 31 +-
.../end2end/index/ReadOnlyIndexFailureIT.java | 289 +++++++++++++++++++
.../apache/phoenix/compile/FromCompiler.java | 2 +-
.../apache/phoenix/compile/JoinCompiler.java | 2 +-
.../compile/TupleProjectionCompiler.java | 4 +-
.../apache/phoenix/compile/UnionCompiler.java | 2 +-
.../coprocessor/MetaDataEndpointImpl.java | 92 +++---
.../coprocessor/MetaDataRegionObserver.java | 27 +-
.../coprocessor/generated/PTableProtos.java | 103 ++++++-
.../phoenix/exception/SQLExceptionCode.java | 2 +
.../apache/phoenix/execute/MutationState.java | 39 ++-
.../index/write/DelegateIndexFailurePolicy.java | 58 ++++
.../index/PhoenixIndexFailurePolicy.java | 48 ++-
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../phoenix/query/QueryServicesOptions.java | 1 +
.../apache/phoenix/schema/DelegateTable.java | 5 +
.../apache/phoenix/schema/MetaDataClient.java | 38 +--
.../java/org/apache/phoenix/schema/PTable.java | 1 +
.../org/apache/phoenix/schema/PTableImpl.java | 51 ++--
.../phoenix/execute/CorrelatePlanTest.java | 2 +-
phoenix-protocol/src/main/PTable.proto | 1 +
21 files changed, 660 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 5f39515..176c5a0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -172,7 +172,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
TableName indexTable =
TableName.valueOf(localIndex ? MetaDataUtil
.getLocalIndexTableName(fullTableName) : fullIndexName);
- HBaseAdmin admin = this.getUtility().getHBaseAdmin();
+ HBaseAdmin admin = getUtility().getHBaseAdmin();
HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
try{
admin.disableTable(indexTable);
@@ -184,20 +184,10 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setString(2, "x2");
stmt.setString(3, "2");
stmt.execute();
- if (transactional) {
- try {
- conn.commit();
- fail();
- } catch (SQLException e) {
- conn.rollback();
- }
- }
- else {
- try {
- conn.commit();
- fail();
- } catch (SQLException e) {
- }
+ try {
+ conn.commit();
+ fail();
+ } catch (SQLException e) {
}
// Verify the metadata for index is correct.
@@ -341,9 +331,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
// find a RS which doesn't has CATALOG table
TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
TableName indexTable = TableName.valueOf(fullIndexName);
- final HBaseCluster cluster = this.getUtility().getHBaseCluster();
+ final HBaseCluster cluster = getUtility().getHBaseCluster();
Collection<ServerName> rss = cluster.getClusterStatus().getServers();
- HBaseAdmin admin = this.getUtility().getHBaseAdmin();
+ HBaseAdmin admin = getUtility().getHBaseAdmin();
List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(),
regions.get(0).getRegionName());
@@ -363,7 +353,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
final HRegionInfo indexRegion = regions.get(0);
final ServerName dstRS = rsToBeKilled;
admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
- this.getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() {
+ getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(),
@@ -379,10 +369,10 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
Thread.sleep(100);
// kill RS hosting index table
- this.getUtility().getHBaseCluster().killRegionServer(rsToBeKilled);
+ getUtility().getHBaseCluster().killRegionServer(rsToBeKilled);
// wait for index table completes recovery
- this.getUtility().waitUntilAllRegionsAssigned(indexTable);
+ getUtility().waitUntilAllRegionsAssigned(indexTable);
// Verify the metadata for index is correct.
do {
@@ -413,6 +403,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
this.fullTableName = fullTableName;
}
+ @Override
public void run() {
if(inProgress.get() > 0){
return;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
new file mode 100644
index 0000000..8df82ce
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+/**
+ *
+ * Test for failure of region server to write to index table.
+ * For some reason dropping tables after running this test
+ * fails unless it runs its own mini cluster.
+ *
+ *
+ * @since 2.1
+ */
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
+ private static final String FAIL_ON_FIRST_PUT = "bbb";
+
+ private String tableName;
+ private String indexName;
+ private String fullTableName;
+ private String fullIndexName;
+
+ public ReadOnlyIndexFailureIT() {
+ this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
+ this.indexName = "IDX";
+ this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+ this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+ serverProps.put("hbase.client.retries.number", "2");
+ serverProps.put("hbase.client.pause", "5000");
+ serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
+ serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
+ serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+ serverProps.put("hbase.coprocessor.abortonerror", "false");
+ serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
+ Map<String, String> clientProps =
+ Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
+ NUM_SLAVES_BASE = 4;
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ @Test
+ public void testWriteFailureReadOnlyLocalIndex() throws Exception {
+ helpTestWriteFailureReadOnlyIndex(true);
+ }
+
+ @Test
+ public void testWriteFailureReadOnlyIndex() throws Exception {
+ helpTestWriteFailureReadOnlyIndex(false);
+ }
+
+ public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = driver.connect(url, props)) {
+ String query;
+ ResultSet rs;
+ conn.setAutoCommit(false);
+ conn.createStatement().execute(
+ "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ query = "SELECT * FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery(query);
+ assertFalse(rs.next());
+
+ if(localIndex) {
+ conn.createStatement().execute(
+ "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName
+ + " (v1) INCLUDE (v2)");
+ } else {
+ conn.createStatement().execute(
+ "CREATE INDEX " + indexName + " ON " + fullTableName
+ + " (v1) INCLUDE (v2)");
+ }
+
+ query = "SELECT * FROM " + fullIndexName;
+ rs = conn.createStatement().executeQuery(query);
+ assertFalse(rs.next());
+
+ // Verify the metadata for index is correct.
+ rs = conn.getMetaData().getTables(null,
+ StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
+ new String[] { PTableType.INDEX.toString() });
+ assertTrue(rs.next());
+ assertEquals(indexName, rs.getString(3));
+ assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ assertFalse(rs.next());
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+ + " VALUES(?,?,?)");
+ stmt.setString(1, "1");
+ stmt.setString(2, "aaa");
+ stmt.setString(3, "a1");
+ stmt.execute();
+ conn.commit();
+
+ stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ stmt.setString(1, "2");
+ stmt.setString(2, FAIL_ON_FIRST_PUT);
+ stmt.setString(3, "b2");
+ stmt.execute();
+ try {
+ conn.commit();
+ fail();
+ } catch (SQLException e) {
+ }
+
+ // Only successfully committed row should be seen
+ query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("aaa", rs.getString(1));
+ assertFalse(rs.next());
+
+ // Verify the metadata for index is correct.
+ rs = conn.getMetaData().getTables(null,
+ StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
+ new String[] { PTableType.INDEX.toString() });
+ assertTrue(rs.next());
+ assertEquals(indexName, rs.getString(3));
+ // the index is always active for tables upon index table write failure
+ assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ assertFalse(rs.next());
+
+ // if the table is transactional the write to the index table will fail because the
+ // index has not been disabled
+ // Verify UPSERT on data table is blocked after index write failed
+ stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ stmt.setString(1, "3");
+ stmt.setString(2, "ccc");
+ stmt.setString(3, "3c");
+ try {
+ stmt.execute();
+ /* Writes would be blocked */
+ conn.commit();
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
+ }
+
+ // Second attempt at writing will succeed
+ int retries = 0;
+ do {
+ Thread.sleep(5 * 1000); // sleep 5 secs
+ if(!hasIndexDisableTimestamp(conn, indexName)){
+ break;
+ }
+ if (++retries == 5) {
+ fail("Failed to rebuild index with allowed time");
+ }
+ } while(true);
+
+ // Verify UPSERT on data table still work after index table is recreated
+ stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ stmt.setString(1, "4");
+ stmt.setString(2, "ddd");
+ stmt.setString(3, "4d");
+ stmt.execute();
+ conn.commit();
+
+ // verify index table has data
+ query = "SELECT count(1) FROM " + indexName;
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+
+ query = "SELECT v1 FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("aaa", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("bbb", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("ddd", rs.getString(1));
+ assertFalse(rs.next());
+
+ query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("aaa", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("bbb", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("ddd", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ private static boolean hasIndexDisableTimestamp(Connection conn, String indexName) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" +
+ " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" +
+ " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" +
+ " AND " + PhoenixDatabaseMetaData.TABLE_NAME + " = '" + indexName + "'");
+ assertTrue(rs.next());
+ long ts = rs.getLong(1);
+ return (!rs.wasNull() && ts > 0);
+ }
+
+ public static class FailingRegionObserver extends SimpleRegionObserver {
+ private Integer failCount = new Integer(0);
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
+ final Durability durability) throws HBaseIOException {
+ if (shouldFailUpsert(c, put)) {
+ synchronized (failCount) {
+ failCount++;
+ if (failCount.intValue() == 1) {
+ // throwing anything other than instances of IOException result
+ // in this coprocessor being unloaded
+ // DoNotRetryIOException tells HBase not to retry this mutation
+ // multiple times
+ throw new DoNotRetryIOException();
+ }
+ }
+ }
+ }
+
+ private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+ return Bytes.contains(put.getRow(), Bytes.toBytes(FAIL_ON_FIRST_PUT));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index dd93c81..ffe9621 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -652,7 +652,7 @@ public class FromCompiler {
PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
null, null, columns, null, null, Collections.<PTable>emptyList(),
false, Collections.<PName>emptyList(), null, null, false, false, false, null,
- null, null, false, false, 0);
+ null, null, false, false, 0, 0L);
String alias = subselectNode.getAlias();
TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index b64b9b7..5d03f57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1302,7 +1302,7 @@ public class JoinCompiler {
left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(),
left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL,
left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(),
- left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency());
+ left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 0fc6d74..4be78a9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -152,7 +152,7 @@ public class TupleProjectionCompiler {
table.getBucketNum(), projectedColumns, table.getParentSchemaName(),
table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
- table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException {
@@ -179,7 +179,7 @@ public class TupleProjectionCompiler {
retainPKColumns ? table.getBucketNum() : null, projectedColumns, null,
null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
- null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency());
+ null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
}
// For extracting column references from single select statement
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index f8b2778..b25baf7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -82,7 +82,7 @@ public class UnionCompiler {
PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME,
PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null,
projectedColumns, null, null, null,
- true, null, null, null, true, true, true, null, null, null, false, false, 0);
+ true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L);
TableRef tableRef = new TableRef(null, tempTable, 0, false);
return tableRef;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 78f9700..ba7eb39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -110,12 +109,9 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.GlobalCache;
@@ -153,6 +149,8 @@ import org.apache.phoenix.parse.PFunction.FunctionArgument;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -192,7 +190,6 @@ import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.UpgradeUtil;
-import org.hamcrest.core.IsInstanceOf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -298,6 +295,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
+ private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -458,7 +456,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return;
}
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
- builder.setMutationTime(currentTime);
+ long disableIndexTimestamp = table.getIndexDisableTimestamp();
+ long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
+ for (PTable index : table.getIndexes()) {
+ disableIndexTimestamp = index.getIndexDisableTimestamp();
+ if (disableIndexTimestamp > 0 && index.getIndexState() == PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
+ minNonZerodisableIndexTimestamp = disableIndexTimestamp;
+ }
+ }
+ // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP
+ // This will keep the table consistent with index as the table has had one more
+ // batch applied to it.
+ if (minNonZerodisableIndexTimestamp == Long.MAX_VALUE) {
+ builder.setMutationTime(currentTime);
+ } else {
+ // Subtract one because we add one due to timestamp granularity in Windows
+ builder.setMutationTime(minNonZerodisableIndexTimestamp - 1);
+ }
if (table.getTimeStamp() != tableTimeStamp) {
builder.setTable(PTableImpl.toProto(table));
@@ -482,11 +496,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
PTable newTable;
+ boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
if (newTable == null) {
return null;
}
- if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
+ if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()
+ || (blockWriteRebuildIndex && newTable.getIndexDisableTimestamp() > 0)) {
if (logger.isDebugEnabled()) {
logger.debug("Caching table "
+ Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
@@ -819,7 +836,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
long updateCacheFrequency = updateCacheFrequencyKv == null ? 0 :
PLong.INSTANCE.getCodec().decodeLong(updateCacheFrequencyKv.getValueArray(),
updateCacheFrequencyKv.getValueOffset(), SortOrder.getDefault());
-
+ Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP];
+ long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(),
+ indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault());
+
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = new ArrayList<PTable>();
List<PName> physicalTables = new ArrayList<PName>();
@@ -864,7 +884,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency,
- stats, baseColumnCount);
+ stats, baseColumnCount, indexDisableTimestamp);
}
private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
@@ -2410,19 +2430,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
- PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
- // We only cache the latest, so we'll end up building the table with every call if the
- // client connection has specified an SCN.
- // TODO: If we indicate to the client that we're returning an older version, but there's a
- // newer version available, the client
- // can safely not call this, since we only allow modifications to the latest.
- if (table != null && table.getTimeStamp() < clientTimeStamp) {
- // Table on client is up-to-date with table on server, so just return
- if (isTableDeleted(table)) {
- return null;
- }
- return table;
- }
// Ask Lars about the expense of this call - if we don't take the lock, we still won't get
// partial results
// get the co-processor environment
@@ -2434,6 +2441,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
* from getting rebuilt too often.
*/
final boolean wasLocked = (rowLock != null);
+ boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
if (!wasLocked) {
rowLock = region.getRowLock(key, true);
if (rowLock == null) {
@@ -2441,6 +2450,19 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
try {
+ PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
+ // We only cache the latest, so we'll end up building the table with every call if the
+ // client connection has specified an SCN.
+ // TODO: If we indicate to the client that we're returning an older version, but there's a
+ // newer version available, the client
+ // can safely not call this, since we only allow modifications to the latest.
+ if (table != null && table.getTimeStamp() < clientTimeStamp) {
+ // Table on client is up-to-date with table on server, so just return
+ if (isTableDeleted(table)) {
+ return null;
+ }
+ return table;
+ }
// Try cache again in case we were waiting on a lock
table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We only cache the latest, so we'll end up building the table with every call if the
@@ -2457,7 +2479,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
// Query for the latest table first, since it's not cached
table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
- if (table != null && table.getTimeStamp() < clientTimeStamp) {
+ if ((table != null && table.getTimeStamp() < clientTimeStamp) ||
+ (blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) {
return table;
}
// Otherwise, query for an older version of the table - it won't be cached
@@ -2773,23 +2796,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
.getValueOffset()]);
- // check if we need reset disable time stamp
- if( (newState == PIndexState.DISABLE) &&
- (currentState == PIndexState.DISABLE || currentState == PIndexState.INACTIVE) &&
- (currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0) &&
- (disableTimeStampKVIndex >= 0)) {
- Long curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
- currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength());
+ if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0) &&
+ (disableTimeStampKVIndex >= 0)) {
+ long curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
+ currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength());
// new DisableTimeStamp is passed in
Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
- Long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
- newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
+ long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
+ newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
if(curTimeStampVal > 0 && curTimeStampVal < newDisableTimeStamp){
// not reset disable timestamp
newKVs.remove(disableTimeStampKVIndex);
+ disableTimeStampKVIndex = -1;
}
}
-
// Detect invalid transitions
if (currentState == PIndexState.BUILDING) {
if (newState == PIndexState.USABLE) {
@@ -2827,7 +2847,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
PTable returnTable = null;
- if (currentState != newState) {
+ if (currentState != newState || disableTimeStampKVIndex != -1) {
byte[] dataTableKey = null;
if(dataTableKV != null) {
dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue());
@@ -2837,7 +2857,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
tableMetadata = new ArrayList<Mutation>(tableMetadata);
// insert an empty KV to trigger time stamp update on data table row
Put p = new Put(dataTableKey);
- p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+ p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
tableMetadata.add(p);
}
boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable;
@@ -2854,7 +2874,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if(dataTableKey != null) {
metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
}
- if (setRowKeyOrderOptimizableCell) {
+ if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1) {
returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index a2f7282..4e019cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -72,6 +72,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
+ private boolean blockWriteRebuildIndex = false;
@Override
public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -98,6 +99,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
+ blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
}
private static String getJdbcUrl(RegionCoprocessorEnvironment env) {
@@ -145,7 +148,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
};
(new Thread(r)).start();
- if (!enableRebuildIndex) {
+ if (!enableRebuildIndex && !blockWriteRebuildIndex) {
LOG.info("Failure Index Rebuild is skipped by configuration.");
return;
}
@@ -181,8 +184,14 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
@Override
public void run() {
+ // FIXME: we should replay the data table Put, as doing a partial index build would only add
+ // the new rows and not delete the previous index value. Also, we should restrict the scan
+ // to only data within this region (as otherwise *every* region will be running this code
+ // separately, all updating the same data.
RegionScanner scanner = null;
PhoenixConnection conn = null;
+ boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
if (inProgress.get() > 0) {
LOG.debug("New ScheduledBuildIndexTask skipped as there is already one running");
return;
@@ -192,7 +201,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
Scan scan = new Scan();
SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
- CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L));
+ CompareFilter.CompareOp.GREATER, PLong.INSTANCE.toBytes(0L));
filter.setFilterIfMissing(true);
scan.setFilter(filter);
scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
@@ -233,11 +242,14 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
if ((dataTable == null || dataTable.length == 0)
- || (indexStat == null || indexStat.length == 0)
- || ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
+ || (indexStat == null || indexStat.length == 0)) {
+ // data table name can't be empty
+ continue;
+ }
+
+ if (!blockWriteRebuildIndex && ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
&& (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) {
// index has to be either in disable or inactive state
- // data table name can't be empty
continue;
}
@@ -254,6 +266,9 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
if (conn == null) {
final Properties props = new Properties();
+ // Set SCN so that we don't ping server and have the upper bound set back to
+ // the timestamp when the failure occurred.
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE));
// don't run a second index populations upsert select
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class);
@@ -276,7 +291,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime);
LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp);
- client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp));
+ client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp), blockWriteRebuildIndex);
} while (hasMore);
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index f74ed0b..9fdfe51 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3328,6 +3328,16 @@ public final class PTableProtos {
* <code>optional int64 updateCacheFrequency = 28;</code>
*/
long getUpdateCacheFrequency();
+
+ // optional int64 indexDisableTimestamp = 29;
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ boolean hasIndexDisableTimestamp();
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ long getIndexDisableTimestamp();
}
/**
* Protobuf type {@code PTable}
@@ -3538,6 +3548,11 @@ public final class PTableProtos {
updateCacheFrequency_ = input.readInt64();
break;
}
+ case 232: {
+ bitField0_ |= 0x01000000;
+ indexDisableTimestamp_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4132,6 +4147,22 @@ public final class PTableProtos {
return updateCacheFrequency_;
}
+ // optional int64 indexDisableTimestamp = 29;
+ public static final int INDEXDISABLETIMESTAMP_FIELD_NUMBER = 29;
+ private long indexDisableTimestamp_;
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ public boolean hasIndexDisableTimestamp() {
+ return ((bitField0_ & 0x01000000) == 0x01000000);
+ }
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ public long getIndexDisableTimestamp() {
+ return indexDisableTimestamp_;
+ }
+
private void initFields() {
schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4161,6 +4192,7 @@ public final class PTableProtos {
rowKeyOrderOptimizable_ = false;
transactional_ = false;
updateCacheFrequency_ = 0L;
+ indexDisableTimestamp_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4312,6 +4344,9 @@ public final class PTableProtos {
if (((bitField0_ & 0x00800000) == 0x00800000)) {
output.writeInt64(28, updateCacheFrequency_);
}
+ if (((bitField0_ & 0x01000000) == 0x01000000)) {
+ output.writeInt64(29, indexDisableTimestamp_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4438,6 +4473,10 @@ public final class PTableProtos {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(28, updateCacheFrequency_);
}
+ if (((bitField0_ & 0x01000000) == 0x01000000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(29, indexDisableTimestamp_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4589,6 +4628,11 @@ public final class PTableProtos {
result = result && (getUpdateCacheFrequency()
== other.getUpdateCacheFrequency());
}
+ result = result && (hasIndexDisableTimestamp() == other.hasIndexDisableTimestamp());
+ if (hasIndexDisableTimestamp()) {
+ result = result && (getIndexDisableTimestamp()
+ == other.getIndexDisableTimestamp());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -4714,6 +4758,10 @@ public final class PTableProtos {
hash = (37 * hash) + UPDATECACHEFREQUENCY_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getUpdateCacheFrequency());
}
+ if (hasIndexDisableTimestamp()) {
+ hash = (37 * hash) + INDEXDISABLETIMESTAMP_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getIndexDisableTimestamp());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -4894,6 +4942,8 @@ public final class PTableProtos {
bitField0_ = (bitField0_ & ~0x04000000);
updateCacheFrequency_ = 0L;
bitField0_ = (bitField0_ & ~0x08000000);
+ indexDisableTimestamp_ = 0L;
+ bitField0_ = (bitField0_ & ~0x10000000);
return this;
}
@@ -5050,6 +5100,10 @@ public final class PTableProtos {
to_bitField0_ |= 0x00800000;
}
result.updateCacheFrequency_ = updateCacheFrequency_;
+ if (((from_bitField0_ & 0x10000000) == 0x10000000)) {
+ to_bitField0_ |= 0x01000000;
+ }
+ result.indexDisableTimestamp_ = indexDisableTimestamp_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -5228,6 +5282,9 @@ public final class PTableProtos {
if (other.hasUpdateCacheFrequency()) {
setUpdateCacheFrequency(other.getUpdateCacheFrequency());
}
+ if (other.hasIndexDisableTimestamp()) {
+ setIndexDisableTimestamp(other.getIndexDisableTimestamp());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6964,6 +7021,39 @@ public final class PTableProtos {
return this;
}
+ // optional int64 indexDisableTimestamp = 29;
+ private long indexDisableTimestamp_ ;
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ public boolean hasIndexDisableTimestamp() {
+ return ((bitField0_ & 0x10000000) == 0x10000000);
+ }
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ public long getIndexDisableTimestamp() {
+ return indexDisableTimestamp_;
+ }
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ public Builder setIndexDisableTimestamp(long value) {
+ bitField0_ |= 0x10000000;
+ indexDisableTimestamp_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 indexDisableTimestamp = 29;</code>
+ */
+ public Builder clearIndexDisableTimestamp() {
+ bitField0_ = (bitField0_ & ~0x10000000);
+ indexDisableTimestamp_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:PTable)
}
@@ -7011,7 +7101,7 @@ public final class PTableProtos {
"\016\n\006values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003",
" \001(\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePost" +
"sCount\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGui" +
- "dePosts\"\244\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " +
+ "dePosts\"\303\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " +
"\002(\014\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType" +
"\030\003 \002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022" +
"\026\n\016sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002" +
@@ -7028,10 +7118,11 @@ public final class PTableProtos {
"storeNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(" +
"\005\022\036\n\026rowKeyOrderOptimizable\030\032 \001(\010\022\025\n\rtra" +
"nsactional\030\033 \001(\010\022\034\n\024updateCacheFrequency" +
- "\030\034 \001(\003*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER",
- "\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org" +
- ".apache.phoenix.coprocessor.generatedB\014P" +
- "TableProtosH\001\210\001\001\240\001\001"
+ "\030\034 \001(\003\022\035\n\025indexDisableTimestamp\030\035 \001(\003*A\n",
+ "\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE" +
+ "W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p" +
+ "hoenix.coprocessor.generatedB\014PTableProt" +
+ "osH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7055,7 +7146,7 @@ public final class PTableProtos {
internal_static_PTable_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PTable_descriptor,
- new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", });
+ new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", });
return null;
}
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index b1d8e7d..7ddd14c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -338,6 +338,8 @@ public enum SQLExceptionCode {
CANNOT_SPLIT_LOCAL_INDEX(1109,"XCL09", "Local index may not be pre-split."),
CANNOT_SALT_LOCAL_INDEX(1110,"XCL10", "Local index may not be salted."),
+ INDEX_FAILURE_BLOCK_WRITE(1120, "XCL20", "Writes to table blocked until index can be updated."),
+
/**
* Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1658962..6095089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -37,6 +37,18 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.Immutable;
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionCodec;
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.hbase11.TransactionAwareHTable;
+import co.cask.tephra.visibility.FenceWait;
+import co.cask.tephra.visibility.VisibilityFence;
+
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -68,6 +80,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
@@ -91,18 +104,6 @@ import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionCodec;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.hbase11.TransactionAwareHTable;
-import co.cask.tephra.visibility.FenceWait;
-import co.cask.tephra.visibility.VisibilityFence;
-
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -737,6 +738,16 @@ public class MutationState implements SQLCloseable {
// Always update tableRef table as the one we've cached may be out of date since when we executed
// the UPSERT VALUES call and updated in the cache before this.
tableRef.setTable(resolvedTable);
+ List<PTable> indexes = resolvedTable.getIndexes();
+ for (PTable idxTtable : indexes) {
+ // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that
+ // our failure mode is block writes on index failure.
+ if (idxTtable.getIndexState() == PIndexState.ACTIVE && idxTtable.getIndexDisableTimestamp() > 0) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE)
+ .setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString()).build().buildException();
+ }
+ }
long timestamp = result.getMutationTime();
if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
serverTimeStamp = timestamp;
@@ -748,8 +759,8 @@ public class MutationState implements SQLCloseable {
Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
if (colValues != PRow.DELETE_MARKER) {
for (PColumn column : colValues.keySet()) {
- if (!column.isDynamic())
- columns.add(column);
+ if (!column.isDynamic())
+ columns.add(column);
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java
new file mode 100644
index 0000000..a7fb7ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+import com.google.common.collect.Multimap;
+
+public class DelegateIndexFailurePolicy implements IndexFailurePolicy {
+
+ private final IndexFailurePolicy delegate;
+
+ public DelegateIndexFailurePolicy(IndexFailurePolicy delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause)
+ throws IOException {
+ delegate.handleFailure(attempted, cause);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return delegate.isStopped();
+ }
+
+ @Override
+ public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+ delegate.setup(parent, env);
+ }
+
+ @Override
+ public void stop(String arg0) {
+ delegate.stop(arg0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 09a8676..c7ed49b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -49,10 +49,13 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
@@ -71,22 +74,19 @@ import com.google.common.collect.Multimap;
* region server. First attempts to disable the index and failing that falls
* back to the default behavior of killing the region server.
*
- * TODO: use delegate pattern instead
- *
- *
- * @since 2.1
*/
-public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
+public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class);
private RegionCoprocessorEnvironment env;
public PhoenixIndexFailurePolicy() {
+ super(new KillServerOnFailurePolicy());
}
@Override
public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
- super.setup(parent, env);
- this.env = env;
+ super.setup(parent, env);
+ this.env = env;
}
/**
@@ -101,9 +101,11 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
*/
@Override
public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+ boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
boolean throwing = true;
try {
- handleFailureWithExceptions(attempted, cause);
+ handleFailureWithExceptions(attempted, cause, blockWriteRebuildIndex);
throwing = false;
} catch (Throwable t) {
LOG.warn("handleFailure failed", t);
@@ -115,7 +117,7 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
}
private void handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted,
- Exception cause) throws Throwable {
+ Exception cause, boolean blockWriteRebuildIndex) throws Throwable {
Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size());
// start by looking at all the tables to which we attempted to write
@@ -157,8 +159,12 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
// Mimic the Put that gets generated by the client on an update of the index state
Put put = new Put(indexTableKey);
- put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
- PIndexState.DISABLE.getSerializedBytes());
+ if (blockWriteRebuildIndex)
+ put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+ PIndexState.ACTIVE.getSerializedBytes());
+ else
+ put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+ PIndexState.DISABLE.getSerializedBytes());
put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
PLong.INSTANCE.toBytes(minTimeStamp));
final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
@@ -194,12 +200,22 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
continue;
}
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
- LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
- + result.getMutationCode() + ". Will use default failure policy instead.");
- throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
+ if (blockWriteRebuildIndex) {
+ LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
+ + result.getMutationCode());
+ throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed.");
+ } else {
+ LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
+ + result.getMutationCode() + ". Will use default failure policy instead.");
+ throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
+ }
}
- LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
- cause);
+ if (blockWriteRebuildIndex)
+ LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.",
+ cause);
+ else
+ LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
+ cause);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index b0e7b6e..fe40d60 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -129,6 +129,9 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB =
"phoenix.index.failure.handling.rebuild.interval";
+ // A master switch if to block writes when index build failed
+ public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write";
+
// Index will be partially re-built from index disable time stamp - following overlap time
public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB =
"phoenix.index.failure.handling.rebuild.overlap.time";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 1838b51..62297ee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -152,6 +152,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES = 1000;
public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000;
public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on
+ public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false;
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index e7bf961..b294f03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -32,6 +32,11 @@ public class DelegateTable implements PTable {
}
@Override
+ public long getIndexDisableTimestamp() {
+ return delegate.getIndexDisableTimestamp();
+ }
+
+ @Override
public long getSequenceNumber() {
return delegate.getSequenceNumber();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0456335..6409dcd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1098,29 +1098,33 @@ public class MetaDataClient {
/**
* Rebuild indexes from a timestamp which is the value from hbase row key timestamp field
*/
- public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef) throws SQLException {
- boolean needRestoreIndexState = false;
- // Need to change index state from Disable to InActive when build index partially so that
- // new changes will be indexed during index rebuilding
- AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE);
- alterIndex(indexStatement);
- needRestoreIndexState = true;
+ public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef, boolean blockWriteRebuildIndex) throws SQLException {
+ boolean needRestoreIndexState = true;
+ AlterIndexStatement indexStatement = null;
+ if (!blockWriteRebuildIndex) {
+ // Need to change index state from Disable to InActive when build index partially so that
+ // new changes will be indexed during index rebuilding
+ indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
+ TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
+ dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE);
+ alterIndex(indexStatement);
+ }
try {
buildIndex(index, dataTableRef);
needRestoreIndexState = false;
} finally {
if(needRestoreIndexState) {
- // reset index state to disable
- indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE);
- alterIndex(indexStatement);
+ if (!blockWriteRebuildIndex) {
+ // reset index state to disable
+ indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
+ TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
+ dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE);
+ alterIndex(indexStatement);
+ }
}
}
}
-
+
/**
* Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
* MetaDataClient.createTable. In doing so, we perform the following translations:
@@ -2004,7 +2008,7 @@ public class MetaDataClient {
Collections.<PTable>emptyList(), isImmutableRows,
Collections.<PName>emptyList(), defaultFamilyName == null ? null :
PNameFactory.newName(defaultFamilyName), null,
- Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0);
+ Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L);
connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
} else if (tableType == PTableType.INDEX && indexId == null) {
if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -2174,7 +2178,7 @@ public class MetaDataClient {
PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
- indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency);
+ indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L);
result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
addTableToCache(result);
return table;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 4a338f6..b2a1d58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -157,6 +157,7 @@ public interface PTable extends PMetaDataEntity {
long getTimeStamp();
long getSequenceNumber();
+ long getIndexDisableTimestamp();
/**
* @return table name
*/
[4/5] phoenix git commit: PHOENIX-2602 Parser does not handle escaped
LPAREN
Posted by ja...@apache.org.
PHOENIX-2602 Parser does not handle escaped LPAREN
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/43b34da1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/43b34da1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/43b34da1
Branch: refs/heads/master
Commit: 43b34da1d4e10bef233bbb748c5dd1be11d7ce18
Parents: 046bda3
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Feb 15 01:44:31 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Feb 15 10:14:58 2016 -0800
----------------------------------------------------------------------
phoenix-core/src/main/antlr3/PhoenixSQL.g | 7 ++++---
.../test/java/org/apache/phoenix/parse/QueryParserTest.java | 6 ++++++
2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/43b34da1/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 0be5717..64e1d32 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -1213,14 +1213,14 @@ DIGIT
STRING_LITERAL
@init{ StringBuilder sb = new StringBuilder(); }
: '\''
- ( t=CHAR_ESC { sb.append(getText()); }
- | t=CHAR { sb.append(t.getText()); }
+ ( t=CHAR { sb.append(t.getText()); }
+ | t=CHAR_ESC { sb.append(getText()); }
)* '\'' { setText(sb.toString()); }
;
fragment
CHAR
- : ( ~('\'') )
+ : ( ~('\'' | '\\') )
;
fragment
@@ -1242,6 +1242,7 @@ CHAR_ESC
| '\\' { setText("\\"); }
| '_' { setText("\\_"); }
| '%' { setText("\\\%"); }
+ | { setText("\\"); }
)
| '\'\'' { setText("\'"); }
;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/43b34da1/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index 5363042..70f590f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -766,4 +766,10 @@ public class QueryParserTest {
String sql = "select * from t where 'a' <= ALL(a-b+1)";
parseQuery(sql);
}
+
+ @Test
+ public void testDoubleBackslash() throws Exception {
+ String sql = "SELECT * FROM T WHERE A LIKE 'a\\(d'";
+ parseQuery(sql);
+ }
}