You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/17 10:17:48 UTC
[2/3] git commit: PHOENIX-1361 Sequence value goes backwards if
sequence validated before reserved
PHOENIX-1361 Sequence value goes backwards if sequence validated before reserved
Conflicts:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b85a9ba6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b85a9ba6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b85a9ba6
Branch: refs/heads/4.0
Commit: b85a9ba6eaa78fc4871b5a7068db52519bfeeb63
Parents: 7e063d6
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Oct 16 23:04:36 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 17 01:11:04 2014 -0700
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/SequenceIT.java | 41 +++++++
.../apache/phoenix/compile/SequenceManager.java | 28 +++--
.../phoenix/expression/DelegateExpression.java | 108 +++++++++++++++++++
.../phoenix/parse/SequenceValueParseNode.java | 14 ++-
.../query/ConnectionQueryServicesImpl.java | 12 +--
.../org/apache/phoenix/schema/Sequence.java | 32 +++---
6 files changed, 203 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b85a9ba6/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
index f8673ce..4f2b9a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -1205,4 +1206,44 @@ public class SequenceIT extends BaseClientManagedTimeIT {
+ unexpectedExceptions + " missing exceptions : " + missingExceptions);
}
}
+
+ @Test
+ public void testValidateBeforeReserve() throws Exception {
+ nextConnection();
+ conn.createStatement().execute(
+ "CREATE TABLE foo (k VARCHAR PRIMARY KEY, l BIGINT)");
+ conn.createStatement().execute(
+ "CREATE SEQUENCE foo.bar");
+
+ nextConnection();
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT NEXT VALUE FOR foo.bar FROM foo");
+ assertTrue(rs.next());
+ conn.createStatement().execute(
+ "UPSERT INTO foo VALUES ('a', NEXT VALUE FOR foo.bar)");
+ conn.createStatement().execute(
+ "UPSERT INTO foo VALUES ('b', NEXT VALUE FOR foo.bar)");
+ conn.commit();
+
+ nextConnection();
+ rs = conn.createStatement().executeQuery("SELECT * FROM foo");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(1,rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals("b",rs.getString(1));
+ assertEquals(2,rs.getLong(2));
+ assertFalse(rs.next());
+
+ nextConnection();
+ PreparedStatement stmt = conn.prepareStatement("SELECT NEXT VALUE FOR foo.bar FROM foo");
+ ParameterMetaData md = stmt.getParameterMetaData();
+ assertEquals(0,md.getParameterCount());
+ rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(3, rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b85a9ba6/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index 9be45a4..03091c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.expression.BaseTerminalExpression;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.parse.SequenceValueParseNode.Op;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PDataType;
@@ -37,6 +38,7 @@ import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.tuple.DelegateTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SchemaUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -46,8 +48,8 @@ public class SequenceManager {
private int[] sequencePosition;
private List<SequenceKey> nextSequences;
private List<SequenceKey> currentSequences;
- private Map<SequenceKey,SequenceValueExpression> sequenceMap;
- private BitSet isNextSequence;
+ private final Map<SequenceKey,SequenceValueExpression> sequenceMap = Maps.newHashMap();
+ private final BitSet isNextSequence = new BitSet();
public SequenceManager(PhoenixStatement statement) {
this.statement = statement;
@@ -117,10 +119,6 @@ public class SequenceManager {
}
public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) {
- if (sequenceMap == null) {
- sequenceMap = Maps.newHashMap();
- isNextSequence = new BitSet();
- }
PName tenantName = statement.getConnection().getTenantId();
String tenantId = tenantName == null ? null : tenantName.getString();
TableName tableName = node.getTableName();
@@ -128,11 +126,13 @@ public class SequenceManager {
SequenceValueExpression expression = sequenceMap.get(key);
if (expression == null) {
int index = sequenceMap.size();
- expression = new SequenceValueExpression(index);
+ expression = new SequenceValueExpression(key, node.getOp(), index);
sequenceMap.put(key, expression);
+ } else if (expression.op != node.getOp()){
+ expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex());
}
// If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT
- if (node.getOp() == SequenceValueParseNode.Op.NEXT_VALUE) {
+ if (node.getOp() == Op.NEXT_VALUE) {
isNextSequence.set(expression.getIndex());
}
@@ -140,7 +140,7 @@ public class SequenceManager {
}
public void validateSequences(Sequence.ValueOp action) throws SQLException {
- if (sequenceMap == null || sequenceMap.isEmpty()) {
+ if (sequenceMap.isEmpty()) {
return;
}
int maxSize = sequenceMap.size();
@@ -174,9 +174,13 @@ public class SequenceManager {
}
private class SequenceValueExpression extends BaseTerminalExpression {
+ private final SequenceKey key;
+ private final Op op;
private final int index;
- private SequenceValueExpression(int index) {
+ private SequenceValueExpression(SequenceKey key, Op op, int index) {
+ this.key = key;
+ this.op = op;
this.index = index;
}
@@ -212,5 +216,9 @@ public class SequenceManager {
return true;
}
+ @Override
+ public String toString() {
+ return op.getName() + " VALUE FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b85a9ba6/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java
new file mode 100644
index 0000000..87a0bc0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class DelegateExpression implements Expression {
+ private final Expression delegate;
+
+ public DelegateExpression(Expression delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return delegate.isNullable();
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return delegate.getDataType();
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return delegate.getMaxLength();
+ }
+
+ @Override
+ public Integer getScale() {
+ return delegate.getScale();
+ }
+
+ @Override
+ public SortOrder getSortOrder() {
+ return delegate.getSortOrder();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ delegate.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ delegate.write(output);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ return delegate.evaluate(tuple, ptr);
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ return delegate.accept(visitor);
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return delegate.getChildren();
+ }
+
+ @Override
+ public void reset() {
+ delegate.reset();
+ }
+
+ @Override
+ public boolean isStateless() {
+ return delegate.isStateless();
+ }
+
+ @Override
+ public Determinism getDeterminism() {
+ return delegate.getDeterminism();
+ }
+
+ @Override
+ public boolean requiresFinalEvaluation() {
+ return delegate.requiresFinalEvaluation();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b85a9ba6/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
index 227d78b..f29d79e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
@@ -22,7 +22,19 @@ import java.sql.SQLException;
public class SequenceValueParseNode extends TerminalParseNode {
- public enum Op {NEXT_VALUE, CURRENT_VALUE};
+ public enum Op {
+ NEXT_VALUE("NEXT"),
+ CURRENT_VALUE("CURRENT");
+
+ private final String name;
+ Op(String name) {
+ this.name = name;
+ }
+ public String getName() {
+ return name;
+ };
+
+ }
private final TableName tableName;
private final Op op;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b85a9ba6/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 7b1278a..c8defa2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -1782,7 +1782,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
*/
@Override
public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 0, action);
+ incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, action);
}
/**
@@ -1798,11 +1798,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
*/
@Override
public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 1, Sequence.ValueOp.RESERVE_SEQUENCE);
+ incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
}
@SuppressWarnings("deprecation")
- private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, int factor, Sequence.ValueOp action) throws SQLException {
+ private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size());
for (SequenceKey key : keys) {
Sequence newSequences = new Sequence(key);
@@ -1823,11 +1823,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (int i = 0; i < sequences.size(); i++) {
Sequence sequence = sequences.get(i);
try {
- values[i] = sequence.incrementValue(timestamp, factor, action);
+ values[i] = sequence.incrementValue(timestamp, op);
} catch (EmptySequenceCacheException e) {
indexes[toIncrementList.size()] = i;
toIncrementList.add(sequence);
- Increment inc = sequence.newIncrement(timestamp, action);
+ Increment inc = sequence.newIncrement(timestamp, op);
incrementBatch.add(inc);
} catch (SQLException e) {
exceptions[i] = e;
@@ -1864,7 +1864,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Sequence sequence = toIncrementList.get(i);
Result result = (Result)resultObjects[i];
try {
- values[indexes[i]] = sequence.incrementValue(result, factor);
+ values[indexes[i]] = sequence.incrementValue(result, op);
} catch (SQLException e) {
exceptions[indexes[i]] = e;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b85a9ba6/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index ae822e3..08af961 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -58,7 +58,7 @@ import com.google.common.math.LongMath;
public class Sequence {
public static final int SUCCESS = 0;
- public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE};
+ public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE, INCREMENT_SEQUENCE};
public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE};
// create empty Sequence key values used while created a sequence row
@@ -140,10 +140,10 @@ public class Sequence {
return value.isDeleted ? null : value;
}
- private long increment(SequenceValue value, int factor) throws SQLException {
- boolean increasingSeq = value.incrementBy > 0;
+ private long increment(SequenceValue value, ValueOp op) throws SQLException {
+ boolean increasingSeq = value.incrementBy > 0 && op != ValueOp.VALIDATE_SEQUENCE;
// check if the the sequence has already reached the min/max limit
- if (value.limitReached) {
+ if (value.limitReached && op != ValueOp.VALIDATE_SEQUENCE) {
if (value.cycle) {
value.limitReached=false;
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
@@ -157,12 +157,11 @@ public class Sequence {
}
long returnValue = value.currentValue;
- if (factor != 0) {
+ if (op == ValueOp.INCREMENT_SEQUENCE) {
boolean overflowOrUnderflow=false;
// advance currentValue while checking for overflow
try {
- long incrementValue = LongMath.checkedMultiply(value.incrementBy, factor);
- value.currentValue = LongMath.checkedAdd(value.currentValue, incrementValue);
+ value.currentValue = LongMath.checkedAdd(value.currentValue, value.incrementBy);
} catch (ArithmeticException e) {
overflowOrUnderflow = true;
}
@@ -177,18 +176,18 @@ public class Sequence {
return returnValue;
}
- public long incrementValue(long timestamp, int factor, ValueOp action) throws SQLException {
+ public long incrementValue(long timestamp, ValueOp op) throws SQLException {
SequenceValue value = findSequenceValue(timestamp);
if (value == null) {
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
}
if (value.currentValue == value.nextValue) {
- if (action == ValueOp.VALIDATE_SEQUENCE) {
+ if (op == ValueOp.VALIDATE_SEQUENCE) {
return value.currentValue;
}
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
}
- return increment(value, factor);
+ return increment(value, op);
}
public List<Append> newReturns() {
@@ -246,7 +245,7 @@ public class Sequence {
return key;
}
- public long incrementValue(Result result, int factor) throws SQLException {
+ public long incrementValue(Result result, ValueOp op) throws SQLException {
// In this case, we don't definitely know the timestamp of the deleted sequence,
// but we know anything older is likely deleted. Worse case, we remove a sequence
// from the cache that we shouldn't have which will cause a gap in sequence values.
@@ -267,9 +266,9 @@ public class Sequence {
.build().buildException();
}
// If we found the sequence, we update our cache with the new value
- SequenceValue value = new SequenceValue(result);
+ SequenceValue value = new SequenceValue(result, op);
insertSequenceValue(value);
- return increment(value, factor);
+ return increment(value, op);
}
@SuppressWarnings("deprecation")
@@ -410,7 +409,7 @@ public class Sequence {
return this.incrementBy == 0;
}
- public SequenceValue(Result r) {
+ public SequenceValue(Result r, ValueOp op) {
KeyValue currentValueKV = getCurrentValueKV(r);
KeyValue incrementByKV = getIncrementByKV(r);
KeyValue cacheSizeKV = getCacheSizeKV(r);
@@ -425,7 +424,10 @@ public class Sequence {
this.maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getValueArray(), maxValueKV.getValueOffset(), SortOrder.getDefault());
this.cycle = (Boolean)PDataType.BOOLEAN.toObject(cycleKV.getValueArray(), cycleKV.getValueOffset(), cycleKV.getValueLength());
this.limitReached = false;
- currentValue = nextValue - incrementBy * cacheSize;
+ currentValue = nextValue;
+ if (op != ValueOp.VALIDATE_SEQUENCE) {
+ currentValue -= incrementBy * cacheSize;
+ }
}
}