You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/12/16 18:49:09 UTC
[2/6] phoenix git commit: Sync 4.x-HBase-1.2 to master (Pedro Boado)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0faa20c..993438e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -96,6 +96,7 @@ import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class MutationState implements SQLCloseable {
private final long batchSize;
private final long batchSizeBytes;
private long batchCount = 0L;
- private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
+ private final Map<TableRef, MultiRowMutationState> mutations;
private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10);
private long sizeOffset;
@@ -130,7 +131,7 @@ public class MutationState implements SQLCloseable {
private long estimatedSize = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false;
- private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
+ private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
final PhoenixTransactionContext phoenixTransactionContext;
@@ -158,12 +159,12 @@ public class MutationState implements SQLCloseable {
}
private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) {
- this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext);
+ this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext);
this.sizeOffset = sizeOffset;
}
MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection,
- Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+ Map<TableRef, MultiRowMutationState> mutations,
boolean subTask, PhoenixTransactionContext txContext) {
this.maxSize = maxSize;
this.maxSizeBytes = maxSizeBytes;
@@ -188,15 +189,19 @@ public class MutationState implements SQLCloseable {
}
}
- public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
+ public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
if (!mutations.isEmpty()) {
this.mutations.put(table, mutations);
}
this.numRows = mutations.size();
- this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, mutations);
+ this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
throwIfTooBig();
}
+
+ public long getEstimatedSize() {
+ return estimatedSize;
+ }
public long getMaxSize() {
return maxSize;
@@ -345,7 +350,7 @@ public class MutationState implements SQLCloseable {
}
public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
- MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null);
+ MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, null);
state.sizeOffset = 0;
return state;
}
@@ -367,12 +372,12 @@ public class MutationState implements SQLCloseable {
return sizeOffset + numRows;
}
- private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows,
- Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
+ Map<TableRef, MultiRowMutationState> dstMutations) {
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
boolean incrementRowCount = dstMutations == this.mutations;
- Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows);
+ MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows);
if (existingRows != null) { // Rows for that table already exist
// Loop through new rows and replace existing with new
for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) {
@@ -384,8 +389,12 @@ public class MutationState implements SQLCloseable {
Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
// if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row.
if (newRow != PRow.DELETE_MARKER) {
+ // decrement estimated size by the size of the old row
+ estimatedSize-=existingRowMutationState.calculateEstimatedSize();
// Merge existing column values with new column values
existingRowMutationState.join(rowEntry.getValue());
+ // increment estimated size by the size of the new row
+ estimatedSize+=existingRowMutationState.calculateEstimatedSize();
// Now that the existing row has been merged with the new row, replace it back
// again (since it was merged with the new one above).
existingRows.put(rowEntry.getKey(), existingRowMutationState);
@@ -394,6 +403,8 @@ public class MutationState implements SQLCloseable {
} else {
if (incrementRowCount && !isIndex) { // Don't count index rows in row count
numRows++;
+ // increment estimated size by the size of the new row
+ estimatedSize += rowEntry.getValue().calculateEstimatedSize();
}
}
}
@@ -401,22 +412,25 @@ public class MutationState implements SQLCloseable {
dstMutations.put(tableRef, existingRows);
} else {
// Size new map at batch size as that's what it'll likely grow to.
- Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+ MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize());
newRows.putAll(srcRows);
dstMutations.put(tableRef, newRows);
if (incrementRowCount && !isIndex) {
numRows += srcRows.size();
+ // if we added all the rows from newMutationState we can just increment the
+ // estimatedSize by newMutationState.estimatedSize
+ estimatedSize += srcRows.estimatedSize;
}
}
}
- private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations,
- Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+ private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations,
+ Map<TableRef, MultiRowMutationState> dstMutations) {
// Merge newMutation with this one, keeping state from newMutation for any overlaps
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) {
+ for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) {
// Replace existing entries for the table with new entries
TableRef tableRef = entry.getKey();
- Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
+ MultiRowMutationState srcRows = entry.getValue();
joinMutationState(tableRef, srcRows, dstMutations);
}
}
@@ -434,12 +448,7 @@ public class MutationState implements SQLCloseable {
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
this.sizeOffset += newMutationState.sizeOffset;
- int oldNumRows = this.numRows;
joinMutationState(newMutationState.mutations, this.mutations);
- // here we increment the estimated size by the fraction of new rows we added from the newMutationState
- if (newMutationState.numRows>0) {
- this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize;
- }
if (!newMutationState.txMutations.isEmpty()) {
if (txMutations.isEmpty()) {
txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
@@ -477,7 +486,7 @@ public class MutationState implements SQLCloseable {
return ptr;
}
- private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
+ private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values,
final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) {
final PTable table = tableRef.getTable();
final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
@@ -512,10 +521,10 @@ public class MutationState implements SQLCloseable {
// we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
if (!sendAll) {
TableRef key = new TableRef(index);
- Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
- if (rowToColumnMap!=null) {
+ MultiRowMutationState multiRowMutationState = mutations.remove(key);
+ if (multiRowMutationState!=null) {
final List<Mutation> deleteMutations = Lists.newArrayList();
- generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null);
+ generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
indexMutations.addAll(deleteMutations);
}
}
@@ -534,14 +543,14 @@ public class MutationState implements SQLCloseable {
}
private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
- final long serverTimestamp, final Map<ImmutableBytesPtr, RowMutationState> values,
+ final long serverTimestamp, final MultiRowMutationState values,
final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
final PTable table = tableRef.getTable();
boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
values.entrySet().iterator();
long timestampToUse = mutationTimestamp;
- Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap();
+ MultiRowMutationState modifiedValues = new MultiRowMutationState(16);
while (iterator.hasNext()) {
Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes();
@@ -616,7 +625,7 @@ public class MutationState implements SQLCloseable {
}
public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) {
- final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
+ final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator();
if (!iterator.hasNext()) {
return Collections.emptyIterator();
}
@@ -624,7 +633,7 @@ public class MutationState implements SQLCloseable {
final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
final long mutationTimestamp = getMutationTimestamp(scn);
return new Iterator<Pair<byte[],List<Mutation>>>() {
- private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
+ private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next();
private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -688,14 +697,14 @@ public class MutationState implements SQLCloseable {
private long[] validateAll() throws SQLException {
int i = 0;
long[] timeStamps = new long[this.mutations.size()];
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
+ for (Map.Entry<TableRef, MultiRowMutationState> entry : mutations.entrySet()) {
TableRef tableRef = entry.getKey();
timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue());
}
return timeStamps;
}
- private long validateAndGetServerTimestamp(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException {
+ private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException {
Long scn = connection.getSCN();
MetaDataClient client = new MetaDataClient(connection);
long serverTimeStamp = tableRef.getTimeStamp();
@@ -907,7 +916,7 @@ public class MutationState implements SQLCloseable {
sendAll = true;
}
- Map<ImmutableBytesPtr, RowMutationState> valuesMap;
+ MultiRowMutationState multiRowMutationState;
Map<TableInfo,List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap();
// add tracing for this operation
try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
@@ -916,16 +925,16 @@ public class MutationState implements SQLCloseable {
while (tableRefIterator.hasNext()) {
// at this point we are going through mutations for each table
final TableRef tableRef = tableRefIterator.next();
- valuesMap = mutations.get(tableRef);
- if (valuesMap == null || valuesMap.isEmpty()) {
+ multiRowMutationState = mutations.get(tableRef);
+ if (multiRowMutationState == null || multiRowMutationState.isEmpty()) {
continue;
}
// Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
- long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++];
+ long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++];
Long scn = connection.getSCN();
long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
final PTable table = tableRef.getTable();
- Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll);
+ Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
// build map from physical table to mutation list
boolean isDataTable = true;
while (mutationsIterator.hasNext()) {
@@ -943,7 +952,7 @@ public class MutationState implements SQLCloseable {
// involved in the transaction since none of them would have been
// committed in the event of a failure.
if (table.isTransactional()) {
- addUncommittedStatementIndexes(valuesMap.values());
+ addUncommittedStatementIndexes(multiRowMutationState.values());
if (txMutations.isEmpty()) {
txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
}
@@ -952,7 +961,7 @@ public class MutationState implements SQLCloseable {
// in the event that we need to replay the commit.
// Copy TableRef so we have the original PTable and know when the
// indexes have changed.
- joinMutationState(new TableRef(tableRef), valuesMap, txMutations);
+ joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations);
}
}
long serverTimestamp = HConstants.LATEST_TIMESTAMP;
@@ -974,8 +983,6 @@ public class MutationState implements SQLCloseable {
long mutationCommitTime = 0;
long numFailedMutations = 0;;
long startTime = 0;
- long startNumRows = numRows;
- long startEstimatedSize = estimatedSize;
do {
TableRef origTableRef = tableInfo.getOrigTableRef();
PTable table = origTableRef.getTable();
@@ -1021,13 +1028,13 @@ public class MutationState implements SQLCloseable {
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
numFailedMutations = 0;
+ // Remove batches as we process them
+ mutations.remove(origTableRef);
if (tableInfo.isDataTable()) {
numRows -= numMutations;
- // decrement estimated size by the fraction of rows we sent to hbase
- estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize;
+ // recalculate the estimated size
+ estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations);
}
- // Remove batches as we process them
- mutations.remove(origTableRef);
} catch (Exception e) {
mutationCommitTime = System.currentTimeMillis() - startTime;
serverTimestamp = ServerUtil.parseServerTimestamp(e);
@@ -1178,7 +1185,7 @@ public class MutationState implements SQLCloseable {
}
private int[] getUncommittedStatementIndexes() {
- for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) {
+ for (MultiRowMutationState rowMutationMap : mutations.values()) {
addUncommittedStatementIndexes(rowMutationMap.values());
}
return uncommittedStatementIndexes;
@@ -1211,7 +1218,7 @@ public class MutationState implements SQLCloseable {
}
public void commit() throws SQLException {
- Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
+ Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
int retryCount = 0;
do {
boolean sendSuccessful=false;
@@ -1421,13 +1428,54 @@ public class MutationState implements SQLCloseable {
}
}
+ public static class MultiRowMutationState {
+ private Map<ImmutableBytesPtr,RowMutationState> rowKeyToRowMutationState;
+ private long estimatedSize;
+
+ public MultiRowMutationState(int size) {
+ this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size);
+ this.estimatedSize = 0;
+ }
+
+ public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) {
+ estimatedSize += rowMutationState.calculateEstimatedSize();
+ return rowKeyToRowMutationState.put(ptr, rowMutationState);
+ }
+
+ public void putAll(MultiRowMutationState other) {
+ estimatedSize += other.estimatedSize;
+ rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
+ }
+
+ public boolean isEmpty() {
+ return rowKeyToRowMutationState.isEmpty();
+ }
+
+ public int size() {
+ return rowKeyToRowMutationState.size();
+ }
+
+ public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() {
+ return rowKeyToRowMutationState.entrySet();
+ }
+
+ public void clear(){
+ rowKeyToRowMutationState.clear();
+ }
+
+ public Collection<RowMutationState> values() {
+ return rowKeyToRowMutationState.values();
+ }
+ }
+
public static class RowMutationState {
@Nonnull private Map<PColumn,byte[]> columnValues;
private int[] statementIndexes;
@Nonnull private final RowTimestampColInfo rowTsColInfo;
private byte[] onDupKeyBytes;
+ private long colValuesSize;
- public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
+ public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
byte[] onDupKeyBytes) {
checkNotNull(columnValues);
checkNotNull(rowTsColInfo);
@@ -1435,6 +1483,12 @@ public class MutationState implements SQLCloseable {
this.statementIndexes = new int[] {statementIndex};
this.rowTsColInfo = rowTsColInfo;
this.onDupKeyBytes = onDupKeyBytes;
+ this.colValuesSize = colValuesSize;
+ }
+
+ public long calculateEstimatedSize() {
+ return colValuesSize + statementIndexes.length * SizedUtil.INT_SIZE + SizedUtil.LONG_SIZE
+ + (onDupKeyBytes != null ? onDupKeyBytes.length : 0);
}
byte[] getOnDupKeyBytes() {
@@ -1453,7 +1507,16 @@ public class MutationState implements SQLCloseable {
// If we already have a row and the new row has an ON DUPLICATE KEY clause
// ignore the new values (as that's what the server will do).
if (newRow.onDupKeyBytes == null) {
- getColumnValues().putAll(newRow.getColumnValues());
+ // increment the column value size by the new row column value size
+ colValuesSize+=newRow.colValuesSize;
+ for (Map.Entry<PColumn,byte[]> entry : newRow.columnValues.entrySet()) {
+ PColumn col = entry.getKey();
+ byte[] oldValue = columnValues.put(col, entry.getValue());
+ if (oldValue!=null) {
+ // decrement column value size by the size of all column values that were replaced
+ colValuesSize-=(col.getEstimatedSize() + oldValue.length);
+ }
+ }
}
// Concatenate ON DUPLICATE KEY bytes to allow multiple
// increments of the same row in the same commit batch.
@@ -1465,7 +1528,7 @@ public class MutationState implements SQLCloseable {
RowTimestampColInfo getRowTimestampColInfo() {
return rowTsColInfo;
}
-
+
}
public ReadMetricQueue getReadMetricQueue() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 1e1cb0d..31d7097 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -53,6 +53,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -64,6 +65,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -189,6 +191,29 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, context.getConnection().getQueryServices());
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return cost;
+ }
+
+ @Override
public List<KeyRange> getSplits() {
if (splits == null)
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index fab7c59..3e380da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.query.KeyRange;
@@ -192,6 +193,23 @@ public class SortMergeJoinPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ return cost.plus(lhsPlan.getCost()).plus(rhsPlan.getCost());
+ }
+
+ @Override
public StatementContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e06522f..e6bf654 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.UnionResultIterators;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -210,6 +211,15 @@ public class UnionPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ Cost cost = Cost.ZERO;
+ for (QueryPlan plan : plans) {
+ cost = cost.plus(plan.getCost());
+ }
+ return cost;
+ }
+
+ @Override
public ParameterMetaData getParameterMetaData() {
return paramMetaData;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
index 15f6e3e..9bb7234 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
@@ -199,8 +199,8 @@ public class RowValueConstructorExpression extends BaseCompoundExpression {
// as otherwise we need it to ensure sort order is correct
for (int k = expressionCount -1 ;
k >=0 && getChildren().get(k).getDataType() != null
- && !getChildren().get(k).getDataType().isFixedWidth()
- && outputBytes[outputSize-1] == QueryConstants.SEPARATOR_BYTE ; k--) {
+ && !getChildren().get(k).getDataType().isFixedWidth()
+ && outputBytes[outputSize-1] == SchemaUtil.getSeparatorByte(true, false, getChildren().get(k)) ; k--) {
outputSize--;
}
ptr.set(outputBytes, 0, outputSize);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 0fc138f..ba6371b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.index;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -161,12 +163,12 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
}
private long handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted,
- Exception cause) throws Throwable {
+ final Exception cause) throws Throwable {
Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
- Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size());
+ final Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size());
// start by looking at all the tables to which we attempted to write
long timestamp = 0;
- boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure;
+ final boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure;
// if using TrackingParallelWriter, we know which indexes failed and only disable those
Set<HTableInterfaceReference> failedTables = cause instanceof MultiIndexWriteFailureException
? new HashSet<HTableInterfaceReference>(((MultiIndexWriteFailureException)cause).getFailedTables())
@@ -210,55 +212,66 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
return timestamp;
}
- PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE;
+ final PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE;
+ final long fTimestamp=timestamp;
// for all the index tables that we've found, try to disable them and if that fails, try to
- for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){
- String indexTableName = tableTimeElement.getKey();
- long minTimeStamp = tableTimeElement.getValue();
- // We need a way of differentiating the block writes to data table case from
- // the leave index active case. In either case, we need to know the time stamp
- // at which writes started failing so we can rebuild from that point. If we
- // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
- // then writes to the data table will be blocked (this is client side logic
- // and we can't change this in a minor release). So we use the sign of the
- // time stamp to differentiate.
- if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) {
- minTimeStamp *= -1;
- }
- // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
- try (HTableInterface systemTable = env.getTable(SchemaUtil
- .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) {
- MetaDataMutationResult result = IndexUtil.updateIndexState(indexTableName, minTimeStamp,
- systemTable, newState);
- if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
- LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
- continue;
- }
- if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
- if (leaveIndexActive) {
- LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
- + result.getMutationCode());
- // If we're not disabling the index, then we don't want to throw as throwing
- // will lead to the RS being shutdown.
- if (blockDataTableWritesOnFailure) {
- throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed.");
+ return User.runAsLoginUser(new PrivilegedExceptionAction<Long>() {
+ @Override
+ public Long run() throws Exception {
+ for (Map.Entry<String, Long> tableTimeElement : indexTableNames.entrySet()) {
+ String indexTableName = tableTimeElement.getKey();
+ long minTimeStamp = tableTimeElement.getValue();
+ // We need a way of differentiating the block writes to data table case from
+ // the leave index active case. In either case, we need to know the time stamp
+ // at which writes started failing so we can rebuild from that point. If we
+ // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
+ // then writes to the data table will be blocked (this is client side logic
+ // and we can't change this in a minor release). So we use the sign of the
+ // time stamp to differentiate.
+ if (!disableIndexOnFailure && !blockDataTableWritesOnFailure) {
+ minTimeStamp *= -1;
+ }
+ // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
+ try (HTableInterface systemTable = env.getTable(SchemaUtil.getPhysicalTableName(
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) {
+ MetaDataMutationResult result = IndexUtil.updateIndexState(indexTableName, minTimeStamp,
+ systemTable, newState);
+ if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
+ LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
+ continue;
+ }
+ if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+ if (leaveIndexActive) {
+ LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = "
+ + result.getMutationCode());
+ // If we're not disabling the index, then we don't want to throw as throwing
+ // will lead to the RS being shutdown.
+ if (blockDataTableWritesOnFailure) { throw new DoNotRetryIOException(
+ "Attempt to update INDEX_DISABLE_TIMESTAMP failed."); }
+ } else {
+ LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
+ + result.getMutationCode() + ". Will use default failure policy instead.");
+ throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
+ }
+ }
+ if (leaveIndexActive)
+ LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName
+ + " due to an exception while writing updates.", cause);
+ else
+ LOG.info("Successfully disabled index " + indexTableName
+ + " due to an exception while writing updates.", cause);
+ } catch (Throwable t) {
+ if (t instanceof Exception) {
+ throw (Exception)t;
+ } else {
+ throw new Exception(t);
}
- } else {
- LOG.warn("Attempt to disable index " + indexTableName + " failed with code = "
- + result.getMutationCode() + ". Will use default failure policy instead.");
- throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed.");
- }
+ }
}
- if (leaveIndexActive)
- LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.",
- cause);
- else
- LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.",
- cause);
+ // Return the cell time stamp (note they should all be the same)
+ return fTimestamp;
}
- }
- // Return the cell time stamp (note they should all be the same)
- return timestamp;
+ });
}
private Collection<? extends String> getLocalIndexNames(HTableInterfaceReference ref,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d35cce1..c699088 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -91,12 +91,14 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.iterate.MaterializedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AddJarsStatement;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.AlterSessionStatement;
import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.parse.ChangePermsStatement;
import org.apache.phoenix.parse.CloseStatement;
import org.apache.phoenix.parse.ColumnDef;
import org.apache.phoenix.parse.ColumnName;
@@ -212,8 +214,9 @@ public class PhoenixStatement implements Statement, SQLCloseable {
QUERY("queried", false),
DELETE("deleted", true),
UPSERT("upserted", true),
- UPGRADE("upgrade", true);
-
+ UPGRADE("upgrade", true),
+ ADMIN("admin", true);
+
private final String toString;
private final boolean isMutation;
Operation(String toString, boolean isMutation) {
@@ -645,6 +648,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public TableRef getTableRef() {
return null;
}
@@ -1153,6 +1161,33 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
}
+ private static class ExecutableChangePermsStatement extends ChangePermsStatement implements CompilableStatement {
+
+ public ExecutableChangePermsStatement (String permsString, boolean isSchemaName, TableName tableName,
+ String schemaName, boolean isGroupName, LiteralParseNode userOrGroup, boolean isGrantStatement) {
+ super(permsString, isSchemaName, tableName, schemaName, isGroupName, userOrGroup, isGrantStatement);
+ }
+
+ @Override
+ public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+ final StatementContext context = new StatementContext(stmt);
+
+ return new BaseMutationPlan(context, this.getOperation()) {
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("GRANT PERMISSION"));
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ MetaDataClient client = new MetaDataClient(getContext().getConnection());
+ return client.changePermissions(ExecutableChangePermsStatement.this);
+ }
+ };
+ }
+ }
+
private static class ExecutableDropIndexStatement extends DropIndexStatement implements CompilableStatement {
public ExecutableDropIndexStatement(NamedNode indexName, TableName tableName, boolean ifExists) {
@@ -1181,8 +1216,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
private static class ExecutableAlterIndexStatement extends AlterIndexStatement implements CompilableStatement {
- public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
- super(indexTableNode, dataTableName, ifExists, state, async);
+ public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+ super(indexTableNode, dataTableName, ifExists, state, async, props);
}
@SuppressWarnings("unchecked")
@@ -1313,11 +1348,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
public ExplainPlan getExplainPlan() throws SQLException {
return new ExplainPlan(Collections.singletonList("EXECUTE UPGRADE"));
}
-
+
@Override
- public StatementContext getContext() {
- return new StatementContext(stmt);
- }
+ public QueryPlan getQueryPlan() { return null; }
+
+ @Override
+ public StatementContext getContext() { return new StatementContext(stmt); }
@Override
public TableRef getTargetRef() {
@@ -1527,10 +1563,10 @@ public class PhoenixStatement implements Statement, SQLCloseable {
public DropIndexStatement dropIndex(NamedNode indexName, TableName tableName, boolean ifExists) {
return new ExecutableDropIndexStatement(indexName, tableName, ifExists);
}
-
+
@Override
- public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
- return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async);
+ public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+ return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async, props);
}
@Override
@@ -1557,6 +1593,13 @@ public class PhoenixStatement implements Statement, SQLCloseable {
public ExecuteUpgradeStatement executeUpgrade() {
return new ExecutableExecuteUpgradeStatement();
}
+
+ @Override
+ public ExecutableChangePermsStatement changePermsStatement(String permsString, boolean isSchemaName, TableName tableName,
+ String schemaName, boolean isGroupName, LiteralParseNode userOrGroup, boolean isGrantStatement) {
+ return new ExecutableChangePermsStatement(permsString, isSchemaName, tableName, schemaName, isGroupName, userOrGroup,isGrantStatement);
+ }
+
}
static class PhoenixStatementParser extends SQLParser {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
index e55b977..4217e40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.mapreduce;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +38,15 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
*/
public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+ private final Set<String> propsToIgnore;
+
+ public PhoenixOutputFormat() {
+ this(Collections.<String>emptySet());
+ }
+
+ public PhoenixOutputFormat(Set<String> propsToIgnore) {
+ this.propsToIgnore = propsToIgnore;
+ }
@Override
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
@@ -52,7 +63,7 @@ public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<Nul
@Override
public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
try {
- return new PhoenixRecordWriter<T>(context.getConfiguration());
+ return new PhoenixRecordWriter<T>(context.getConfiguration(), propsToIgnore);
} catch (SQLException e) {
LOG.error("Error calling PhoenixRecordWriter " + e.getMessage());
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 70ee3f5..52f2fe3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +48,11 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul
private long numRecords = 0;
public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
- this.conn = ConnectionUtil.getOutputConnection(configuration);
+ this(configuration, Collections.<String>emptySet());
+ }
+
+ public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
+ this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.statement = this.conn.prepareStatement(upsertQuery);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index ada3816..56a5ef5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -20,15 +20,16 @@ package org.apache.phoenix.mapreduce.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.Collections;
import java.util.Properties;
+import java.util.Set;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
+import com.google.common.base.Preconditions;
+
/**
* Utility class to return a {@link Connection} .
*/
@@ -74,15 +75,29 @@ public class ConnectionUtil {
* Create the configured output Connection.
*
* @param conf configuration containing the connection information
+ * @return the configured output connection
+ */
+ public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf, Set<String> ignoreTheseProps) throws SQLException {
+ return getOutputConnection(conf, new Properties(), ignoreTheseProps);
+ }
+
+ /**
+ * Create the configured output Connection.
+ *
+ * @param conf configuration containing the connection information
* @param props custom connection properties
* @return the configured output connection
*/
public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException {
+ return getOutputConnection(conf, props, Collections.<String>emptySet());
+ }
+
+ public static Connection getOutputConnection(final Configuration conf, Properties props, Set<String> withoutTheseProps) throws SQLException {
Preconditions.checkNotNull(conf);
return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
PhoenixConfigurationUtil.getClientPort(conf),
PhoenixConfigurationUtil.getZNodeParent(conf),
- PropertiesUtil.combineProperties(props, conf));
+ PropertiesUtil.combineProperties(props, conf, withoutTheseProps));
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
new file mode 100644
index 0000000..b83f354
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import java.util.Objects;
+
+/**
+ * Optimizer cost in terms of CPU, memory, and I/O usage, the unit of which is now the
+ * number of bytes processed.
+ *
+ */
+public class Cost implements Comparable<Cost> {
+ /** The unknown cost. */
+ public static Cost UNKNOWN = new Cost(Double.NaN, Double.NaN, Double.NaN) {
+ @Override
+ public String toString() {
+ return "{unknown}";
+ }
+ };
+
+ /** The zero cost. */
+ public static Cost ZERO = new Cost(0, 0, 0) {
+ @Override
+ public String toString() {
+ return "{zero}";
+ }
+ };
+
+ private final double cpu;
+ private final double memory;
+ private final double io;
+
+ public Cost(double cpu, double memory, double io) {
+ this.cpu = cpu;
+ this.memory = memory;
+ this.io = io;
+ }
+
+ public double getCpu() {
+ return cpu;
+ }
+
+ public double getMemory() {
+ return memory;
+ }
+
+ public double getIo() {
+ return io;
+ }
+
+ public boolean isUnknown() {
+ return this == UNKNOWN;
+ }
+
+ public Cost plus(Cost other) {
+ if (isUnknown() || other.isUnknown()) {
+ return UNKNOWN;
+ }
+
+ return new Cost(
+ this.cpu + other.cpu,
+ this.memory + other.memory,
+ this.io + other.io);
+ }
+
+ public Cost multiplyBy(double factor) {
+ if (isUnknown()) {
+ return UNKNOWN;
+ }
+
+ return new Cost(
+ this.cpu * factor,
+ this.memory * factor,
+ this.io * factor);
+ }
+
+ // TODO right now for simplicity, we choose to ignore CPU and memory costs. We may
+ // add those into account as our cost model mature.
+ @Override
+ public int compareTo(Cost other) {
+ if (isUnknown() && other.isUnknown()) {
+ return 0;
+ } else if (isUnknown() && !other.isUnknown()) {
+ return 1;
+ } else if (!isUnknown() && other.isUnknown()) {
+ return -1;
+ }
+
+ double d = this.io - other.io;
+ return d == 0 ? 0 : (d > 0 ? 1 : -1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj
+ || (obj instanceof Cost && this.compareTo((Cost) obj) == 0);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cpu, memory, io);
+ }
+
+ @Override
+ public String toString() {
+ return "{cpu: " + cpu + ", memory: " + memory + ", io: " + io + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index b3df50b..64dad58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -67,10 +67,12 @@ public class QueryOptimizer {
private final QueryServices services;
private final boolean useIndexes;
+ private final boolean costBased;
public QueryOptimizer(QueryServices services) {
this.services = services;
this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
+ this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
}
public QueryPlan optimize(PhoenixStatement statement, QueryPlan dataPlan) throws SQLException {
@@ -91,7 +93,7 @@ public class QueryOptimizer {
}
public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
- List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
+ List<QueryPlan> plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true);
return plans.get(0);
}
@@ -309,10 +311,11 @@ public class QueryOptimizer {
}
return null;
}
-
+
/**
* Order the plans among all the possible ones from best to worst.
- * Since we don't keep stats yet, we use the following simple algorithm:
+ * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on
+ * their costs, otherwise we use the following simple algorithm:
* 1) If the query is a point lookup (i.e. we have a set of exact row keys), choose that one immediately.
* 2) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression
* in the same order as the row key columns.
@@ -320,9 +323,6 @@ public class QueryOptimizer {
* a) the most row key columns that may be used to form the start/stop scan key (i.e. bound slots).
* b) the plan that preserves ordering for a group by.
* c) the non local index table plan
- * TODO: We should make more of a cost based choice: The largest number of bound slots does not necessarily
- * correspond to the least bytes scanned. We could consider the slots bound for upper and lower ranges
- * separately, or we could calculate the bytes scanned between the start and stop row of each table.
* @param plans the list of candidate plans
* @return list of plans ordered from best to worst.
*/
@@ -331,7 +331,21 @@ public class QueryOptimizer {
if (plans.size() == 1) {
return plans;
}
-
+
+ if (this.costBased) {
+ Collections.sort(plans, new Comparator<QueryPlan>() {
+ @Override
+ public int compare(QueryPlan plan1, QueryPlan plan2) {
+ return plan1.getCost().compareTo(plan2.getCost());
+ }
+ });
+ // Return ordered list based on cost if stats are available; otherwise fall
+ // back to static ordering.
+ if (!plans.get(0).getCost().isUnknown()) {
+ return stopAtBestPlan ? plans.subList(0, 1) : plans;
+ }
+ }
+
/**
* If we have a plan(s) that are just point lookups (i.e. fully qualified row
* keys), then favor those first.
@@ -428,7 +442,7 @@ public class QueryOptimizer {
}
});
-
+
return stopAtBestPlan ? bestCandidates.subList(0, 1) : bestCandidates;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
index 1890d31..678e560 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AddColumnStatement.java
@@ -48,4 +48,4 @@ public class AddColumnStatement extends AlterTableStatement {
public ListMultimap<String,Pair<String,Object>> getProps() {
return props;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
index 11328c2..de04505 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
@@ -17,20 +17,31 @@
*/
package org.apache.phoenix.parse;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
public class AlterIndexStatement extends SingleTableStatement {
private final String dataTableName;
private final boolean ifExists;
private final PIndexState indexState;
private boolean async;
+ private ListMultimap<String,Pair<String,Object>> props;
+ private static final PTableType tableType=PTableType.INDEX;
public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean async) {
+ this(indexTableNode,dataTableName,ifExists,indexState,async,null);
+ }
+
+ public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean async, ListMultimap<String,Pair<String,Object>> props) {
super(indexTableNode,0);
this.dataTableName = dataTableName;
this.ifExists = ifExists;
this.indexState = indexState;
this.async = async;
+ this.props= props==null ? ImmutableListMultimap.<String,Pair<String,Object>>of() : props;
}
public String getTableName() {
@@ -54,4 +65,7 @@ public class AlterIndexStatement extends SingleTableStatement {
return async;
}
+ public ListMultimap<String,Pair<String,Object>> getProps() { return props; }
+
+ public PTableType getTableType(){ return tableType; }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java
new file mode 100644
index 0000000..0eae26f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ChangePermsStatement.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.antlr.runtime.RecognitionException;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.phoenix.exception.PhoenixParserException;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.util.Arrays;
+
+/**
+ * See PHOENIX-672, Use GRANT/REVOKE statements to assign or remove permissions for a user OR group on a table OR namespace
+ * Permissions are managed by HBase using hbase:acl table, Allowed permissions are RWXCA
+ */
+public class ChangePermsStatement implements BindableStatement {
+
+ private Permission.Action[] permsList;
+ private TableName tableName;
+ private String schemaName;
+ private String name;
+ // Grant/Revoke statements are differentiated based on this boolean
+ private boolean isGrantStatement;
+
+ public ChangePermsStatement(String permsString, boolean isSchemaName,
+ TableName tableName, String schemaName, boolean isGroupName, LiteralParseNode ugNode, boolean isGrantStatement) {
+ // PHOENIX-672 HBase API doesn't allow to revoke specific permissions, hence this parameter will be ignored here.
+ // To comply with SQL standards, we may support the user given permissions to revoke specific permissions in future.
+ // GRANT permissions statement requires this parameter and the parsing will fail if it is not specified in SQL
+ if(permsString != null) {
+ Permission permission = new Permission(permsString.getBytes());
+ permsList = permission.getActions();
+ }
+ if(isSchemaName) {
+ this.schemaName = SchemaUtil.normalizeIdentifier(schemaName);
+ } else {
+ this.tableName = tableName;
+ }
+ name = SchemaUtil.normalizeLiteral(ugNode);
+ name = isGroupName ? AuthUtil.toGroupEntry(name) : name;
+ this.isGrantStatement = isGrantStatement;
+ }
+
+ public Permission.Action[] getPermsList() {
+ return permsList;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ public boolean isGrantStatement() {
+ return isGrantStatement;
+ }
+
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer = this.isGrantStatement() ? buffer.append("GRANT ") : buffer.append("REVOKE ");
+ buffer.append("permissions requested for user/group: " + this.getName());
+ if (this.getSchemaName() != null) {
+ buffer.append(" for Schema: " + this.getSchemaName());
+ } else if (this.getTableName() != null) {
+ buffer.append(" for Table: " + this.getTableName());
+ }
+ buffer.append(" Permissions: " + Arrays.toString(this.getPermsList()));
+ return buffer.toString();
+ }
+
+ @Override
+ public int getBindCount() {
+ return 0;
+ }
+
+ @Override
+ public PhoenixStatement.Operation getOperation() {
+ return PhoenixStatement.Operation.ADMIN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
index 7c255cb..f5ab3f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSchemaStatement.java
@@ -24,7 +24,7 @@ public class CreateSchemaStatement extends MutableStatement {
private final boolean ifNotExists;
public CreateSchemaStatement(String schemaName,boolean ifNotExists) {
- this.schemaName = null == schemaName ? SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE : schemaName;
+ this.schemaName = schemaName;
this.ifNotExists = ifNotExists;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 0058f38..9be59f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ArrayListMultimap;
@@ -374,10 +373,10 @@ public class ParseNodeFactory {
return new DropIndexStatement(indexName, tableName, ifExists);
}
- public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
- return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async);
+ public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+ return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async, props);
}
-
+
public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, false);
}
@@ -925,4 +924,10 @@ public class ParseNodeFactory {
public UseSchemaStatement useSchema(String schemaName) {
return new UseSchemaStatement(schemaName);
}
+
+ public ChangePermsStatement changePermsStatement(String permsString, boolean isSchemaName, TableName tableName
+ , String schemaName, boolean isGroupName, LiteralParseNode userOrGroup, boolean isGrantStatement) {
+ return new ChangePermsStatement(permsString, isSchemaName, tableName, schemaName, isGroupName, userOrGroup, isGrantStatement);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 45ab5fa..90f8089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -86,6 +86,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<PColumn> columns) throws SQLException;
public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;
public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException;
+ public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName, Map<String, List<Pair<String,Object>>> stmtProperties, PTable table) throws SQLException;
+
public MutationState updateData(MutationPlan plan) throws SQLException;
public void init(String url, Properties props) throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 532b586..072bf28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
@@ -190,6 +191,7 @@ import org.apache.phoenix.schema.EmptySequenceCacheException;
import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
+import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
@@ -867,7 +869,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
- if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
+ if ((SchemaUtil.isStatsTable(tableName) || SchemaUtil.isMetaTable(tableName))
+ && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
null, priority, null);
}
@@ -1223,7 +1226,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES)
.setMessage(
"Ensure that config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
- + " is consitent on client and server.")
+ + " is consistent on client and server.")
.build().buildException(); }
lowestClusterHBaseVersion = minHBaseVersion;
} catch (SQLException e) {
@@ -1721,7 +1724,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
boolean nonTxToTx = false;
- Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, families, tableProps);
+ Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, colFamiliesForPColumnsToBeAdded, tableProps);
HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
if (tableDescriptor != null) {
@@ -1939,7 +1942,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
}
- private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<Pair<byte[], Map<String, Object>>> families, Map<String, Object> tableProps) throws SQLException {
+ private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties,
+ Set<String> colFamiliesForPColumnsToBeAdded, Map<String, Object> tableProps) throws SQLException {
Map<String, Map<String, Object>> stmtFamiliesPropsMap = new HashMap<>(properties.size());
Map<String,Object> commonFamilyProps = new HashMap<>();
boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && !colFamiliesForPColumnsToBeAdded.isEmpty();
@@ -2458,6 +2462,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
logger.warn("Could not check for Phoenix SYSTEM tables, assuming they exist and are properly configured");
checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, getProps()).getName());
success = true;
+ } else if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), NamespaceNotFoundException.class))) {
+ // This exception is only possible if SYSTEM namespace mapping is enabled and SYSTEM namespace is missing
+ // It implies that SYSTEM tables are not created and hence we shouldn't provide a connection
+ AccessDeniedException ade = new AccessDeniedException("Insufficient permissions to create SYSTEM namespace and SYSTEM Tables");
+ initializationException = ServerUtil.parseServerException(ade);
} else {
initializationException = e;
}
@@ -2469,8 +2478,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// with SYSTEM Namespace. (See PHOENIX-4227 https://issues.apache.org/jira/browse/PHOENIX-4227)
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
ConnectionQueryServicesImpl.this.getProps())) {
- metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS "
- + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+ try {
+ metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS "
+ + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+ } catch (NewerSchemaAlreadyExistsException e) {
+ // Older clients with appropriate perms may try getting a new connection
+ // This results in NewerSchemaAlreadyExistsException, so we can safely ignore it here
+ } catch (PhoenixIOException e) {
+ if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) {
+ // Ignore ADE
+ } else {
+ throw e;
+ }
+ }
}
if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
createOtherSystemTables(metaConnection, hBaseAdmin);
@@ -2528,7 +2548,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
final TableName mutexTableName = SchemaUtil.getPhysicalTableName(
PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, props);
List<TableName> systemTables = getSystemTableNames(admin);
- if (systemTables.contains(mutexTableName)) {
+ if (systemTables.contains(mutexTableName) || admin.tableExists( TableName.valueOf(
+ PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME,PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME))) {
logger.debug("System mutex table already appears to exist, not creating it");
return;
}
@@ -2545,8 +2566,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
put.add(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, UPGRADE_MUTEX_UNLOCKED);
sysMutexTable.put(put);
}
- } catch (TableExistsException e) {
+ } catch (TableExistsException | AccessDeniedException e) {
// Ignore
+ }catch(PhoenixIOException e){
+ if(e.getCause()!=null && e.getCause() instanceof AccessDeniedException)
+ {
+ //Ignore
+ }else{
+ throw e;
+ }
}
}
@@ -3573,6 +3601,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
+ public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName, Map<String, List<Pair<String,Object>>> stmtProperties, PTable table) throws SQLException {
+ if(stmtProperties==null) return updateIndexState(tableMetaData,parentTableName);
+
+ Map<String, Object> tableProps = new HashMap<String, Object>();
+ Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = separateAndValidateProperties(table, stmtProperties, new HashSet<String>(), tableProps);
+ HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
+ HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
+ Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
+ Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
+ if (tableDescriptor != null) {
+ tableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+ origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
+ tableDescriptors.add(tableDescriptor);
+ origTableDescriptors.add(origTableDescriptor);
+ }
+ sendHBaseMetaData(tableDescriptors, true);
+ return updateIndexState(tableMetaData,parentTableName);
+ }
+
+ @Override
public long createSequence(String tenantId, String schemaName, String sequenceName,
long startWith, long incrementBy, long cacheSize, long minValue, long maxValue,
boolean cycle, long timestamp) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index f15e0b1..3154f86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -383,6 +383,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
+ public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata,
+ String parentTableName, Map<String, List<Pair<String, Object>>> stmtProperties,
+ PTable table) throws SQLException {
+ return updateIndexState(tableMetadata,parentTableName);
+ }
+
+ @Override
public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 6c464eb..05d1af6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -135,7 +135,13 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException {
return getDelegate().updateIndexState(tableMetadata, parentTableName);
}
-
+
+ @Override public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata,
+ String parentTableName, Map<String, List<Pair<String, Object>>> stmtProperties,
+ PTable table) throws SQLException {
+ return getDelegate().updateIndexState(tableMetadata, parentTableName, stmtProperties,table);
+ }
+
@Override
public void init(String url, Properties props) throws SQLException {
getDelegate().init(url, props);