You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/07/10 05:56:59 UTC
[2/2] phoenix git commit: PHOENIX-1954 Reserve chunks of numbers for
a sequence (Jan Fernando)
PHOENIX-1954 Reserve chunks of numbers for a sequence (Jan Fernando)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3b1bfa0d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3b1bfa0d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3b1bfa0d
Branch: refs/heads/master
Commit: 3b1bfa0d7b83f0b9ee0ad535d6e1f99777c14cb6
Parents: 984e622
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Jul 9 20:49:03 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Jul 9 20:56:48 2015 -0700
----------------------------------------------------------------------
.../end2end/SequenceBulkAllocationIT.java | 1286 ++++++++++++++++++
phoenix-core/src/main/antlr3/PhoenixSQL.g | 5 +-
.../apache/phoenix/compile/SequenceManager.java | 70 +-
.../compile/SequenceValueExpression.java | 14 +-
.../coprocessor/SequenceRegionObserver.java | 64 +-
.../phoenix/exception/SQLExceptionCode.java | 2 +
.../apache/phoenix/jdbc/PhoenixStatement.java | 10 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 6 +-
.../phoenix/parse/SequenceValueParseNode.java | 8 +-
.../phoenix/query/ConnectionQueryServices.java | 5 +-
.../query/ConnectionQueryServicesImpl.java | 23 +-
.../query/ConnectionlessQueryServicesImpl.java | 20 +-
.../query/DelegateConnectionQueryServices.java | 13 +-
.../apache/phoenix/schema/MetaDataClient.java | 2 +-
.../org/apache/phoenix/schema/Sequence.java | 107 +-
.../phoenix/schema/SequenceAllocation.java | 48 +
.../org/apache/phoenix/util/SequenceUtil.java | 47 +-
.../phoenix/schema/SequenceAllocationTest.java | 59 +
.../apache/phoenix/util/SequenceUtilTest.java | 54 +
19 files changed, 1763 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java
new file mode 100644
index 0000000..e7db1ec
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java
@@ -0,0 +1,1286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Suite of integration tests that validate that Bulk Allocation of Sequence values
+ * using the NEXT <n> VALUES FOR <seq> syntax works as expected and interacts
+ * correctly with NEXT VALUE FOR <seq> and CURRENT VALUE FOR <seq>.
+ *
+ * All tests are run with both a generic connection and a multi-tenant connection.
+ *
+ */
+@RunWith(Parameterized.class)
+public class SequenceBulkAllocationIT extends BaseClientManagedTimeIT {
+
+ private static final long BATCH_SIZE = 3;
+ private static final String SELECT_NEXT_VALUE_SQL =
+ "SELECT NEXT VALUE FOR %s FROM SYSTEM.\"SEQUENCE\" LIMIT 1";
+ private static final String SELECT_CURRENT_VALUE_SQL =
+ "SELECT CURRENT VALUE FOR %s FROM SYSTEM.\"SEQUENCE\" LIMIT 1";
+ private static final String CREATE_SEQUENCE_NO_MIN_MAX_TEMPLATE =
+ "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s CACHE %s";
+ private static final String CREATE_SEQUENCE_WITH_MIN_MAX_TEMPLATE =
+ "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CACHE %s";
+ private static final String CREATE_SEQUENCE_WITH_MIN_MAX_AND_CYCLE_TEMPLATE =
+ "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CYCLE CACHE %s";
+
+
+ private Connection conn;
+ private String tenantId;
+
+ public SequenceBulkAllocationIT(String tenantId) {
+ this.tenantId = tenantId;
+ }
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String, String> props = getDefaultProps();
+ // Must update config before starting server
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // close any open connection between tests, so that connections are not leaked
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ @Parameters
+ public static Object[] data() {
+ return new Object[] {null, "tenant1"};
+ }
+
+
+ @Test
+ public void testSequenceParseNextValuesWithNull() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT NULL VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("null is not allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+ @Test
+ public void testSequenceParseNextValuesWithNonNumber() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT '89b' VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Only integers and longs are allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+
+ @Test
+ public void testSequenceParseNextValuesWithNegativeNumber() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT '-1' VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("null is not allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+ @Test
+ public void testParseNextValuesSequenceWithZeroAllocated() throws Exception {
+ nextConnection();
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT 0 VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Only integers and longs are allowed to be used for <n> in NEXT <n> VALUES FOR <seq>");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+
+ @Test
+ public void testNextValuesForSequenceWithNoAllocatedValues() throws Exception {
+ // Create Sequence
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(1)
+ .numAllocated(100).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ final int currentValueAfterAllocation = 100;
+ reserveSlotsInBulkAndAssertValue(1, props.numAllocated);
+ assertExpectedStateInSystemSequence(props, 101);
+ assertExpectedNumberOfValuesAllocated(1, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(101);
+ }
+
+ @Test
+ /**
+ * Validates we can invoke NEXT <n> VALUES FOR using bind vars.
+ */
+ public void testNextValuesForSequenceUsingBinds() throws Exception {
+
+ // Create Sequence
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(1)
+ .numAllocated(100).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Allocate 100 slots using SQL with Bind Params and a PreparedStatement
+ final int currentValueAfterAllocation = 100;
+ reserveSlotsInBulkUsingBindsAndAssertValue(1,props.numAllocated);
+ assertExpectedStateInSystemSequence(props, 101);
+ assertExpectedNumberOfValuesAllocated(1, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(101);
+ }
+
+
+ @Test
+ public void testNextValuesForSequenceWithPreviouslyAllocatedValues() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+
+ @Test
+ /**
+ * Validates that if we close a connection after performing
+ * NEXT <n> VALUES FOR <seq> the values are correctly returned from
+ * the latest batch.
+ */
+ public void testConnectionCloseReturnsSequenceValuesCorrectly() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(2).startsWith(1).cacheSize(100)
+ .numAllocated(100).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(3);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 399;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 201;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+
+ // Close the Connection
+ conn.close();
+
+ // Test that sequence, doesn't have gaps after closing the connection
+ nextConnection();
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ assertExpectedCurrentValueForSequence(nextValueAfterAllocation);
+
+ }
+
+ @Test
+ /**
+ * Validates that calling NEXT <n> VALUES FOR <seq> works correctly with UPSERT.
+ */
+ public void testNextValuesForSequenceWithUpsert() throws Exception {
+
+ // Create Sequence
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+
+ // Create TABLE
+ nextGenericConnection();
+ conn.createStatement().execute("CREATE TABLE bulkalloc.test ( id INTEGER NOT NULL PRIMARY KEY)");
+ nextConnection();
+
+ // Grab batch from Sequence
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+ assertExpectedStateInSystemSequence(props, 101);
+
+
+ // Perform UPSERT and validate Sequence was incremented as expected
+ conn.createStatement().execute("UPSERT INTO bulkalloc.test (id) VALUES (NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha)");
+ conn.commit();
+ assertExpectedStateInSystemSequence(props, 1101);
+
+ // SELECT values out and verify
+ nextConnection();
+ String query = "SELECT id, NEXT VALUE FOR bulkalloc.alpha FROM bulkalloc.test";
+ ResultSet rs = conn.prepareStatement(query).executeQuery();
+ assertTrue(rs.next());
+ assertEquals(101, rs.getInt(1)); // Threw out cache of 100, incremented by 1000
+ assertEquals(1101, rs.getInt(2));
+ assertFalse(rs.next());
+ }
+
+
+
+
+ @Test
+ public void testNextValuesForSequenceWithIncrementBy() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(3).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(4);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 3298;
+ int startValueAfterAllocation = 301;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(3298);
+ assertExpectedNextValueForSequence(3301);
+ }
+
+ @Test
+ public void testNextValuesForSequenceWithNegativeIncrementBy() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(-1).startsWith(2000).cacheSize(100)
+ .numAllocated(1000).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(2000);
+ assertExpectedCurrentValueForSequence(2000);
+ assertExpectedNextValueForSequence(1999);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 901;
+ int startValueAfterAllocation = 1900;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(901);
+ assertExpectedNextValueForSequence(900);
+ }
+
+ @Test
+ public void testNextValuesForSequenceWithNegativeIncrementByGreaterThanOne() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(-5).startsWith(2000).cacheSize(100)
+ .numAllocated(100).build();
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Pull first batch from Sequence
+ assertExpectedNextValueForSequence(2000);
+ assertExpectedCurrentValueForSequence(2000);
+ assertExpectedNextValueForSequence(1995);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 1005;
+ int startValueAfterAllocation = 1500;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(1005);
+ assertExpectedNextValueForSequence(1000);
+ }
+
+
+ @Test
+ /**
+ * Validates that for NEXT <n> VALUES FOR if you try an allocate more slots such that that
+ * we exceed the max value of the sequence we throw an exception. Allocating sequence values in bulk
+ * should be an all or nothing operation - if the operation succeeds clients are guaranteed that they
+ * have access to all slots requested.
+ */
+ public void testNextValuesForSequenceExceedsMaxValue() throws Exception {
+ final SequenceProperties sequenceProps =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100)
+ .numAllocated(1000).minValue(100).maxValue(900).build();
+
+ nextConnection();
+ createSequenceWithMinMax(sequenceProps);
+ nextConnection();
+
+ // Pull first batch from the sequence
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(101);
+
+ // Attempt to bulk Allocate more slots than available
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + sequenceProps.numAllocated
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Invoking SELECT NEXT VALUES should have thrown Reached Max Value Exception");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+
+ // Assert sequence didn't advance
+ assertExpectedCurrentValueForSequence(101);
+ assertExpectedNextValueForSequence(102);
+ }
+
+ @Test
+ /**
+ * Validates that for NEXT <n> VALUES FOR if you try an allocate more slots such that that
+ * we exceed the min value of the sequence we throw an exception. Allocating sequence values in bulk
+ * should be an all or nothing operation - if the operation succeeds clients are guaranteed that they
+ * have access to all slots requested.
+ */
+ public void testNextValuesForSequenceExceedsMinValue() throws Exception {
+ final SequenceProperties sequenceProps =
+ new SequenceProperties.Builder().incrementBy(-5).startsWith(900).cacheSize(100)
+ .numAllocated(160).minValue(100).maxValue(900).build();
+
+ nextConnection();
+ createSequenceWithMinMax(sequenceProps);
+ nextConnection();
+
+ // Pull first batch from the sequence
+ assertExpectedNextValueForSequence(900);
+ assertExpectedCurrentValueForSequence(900);
+ assertExpectedNextValueForSequence(895);
+
+ // Attempt to bulk Allocate more slots than available
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + sequenceProps.numAllocated
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Invoking SELECT NEXT VALUES should have thrown Reached Max Value Exception");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+
+ // Assert sequence didn't advance (we still throw out the cached values)
+ assertExpectedCurrentValueForSequence(895);
+ assertExpectedNextValueForSequence(890);
+ }
+
+
+ @Test
+ /**
+ * Validates that if we don't exceed the limit bulk allocation works with sequences with a
+ * min and max defined.
+ */
+ public void testNextValuesForSequenceWithMinMaxDefined() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(1000).minValue(100).maxValue(6000).build();
+
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(105);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 5595;
+ int startValueAfterAllocation = 600;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(5595);
+ assertExpectedNextValueForSequence(5600);
+ }
+
+ @Test
+ public void testNextValuesForSequenceWithDefaultMax() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100)
+ .numAllocated(Long.MAX_VALUE - 100).build();
+
+ // Create Sequence
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ long currentValueAfterAllocation = 100;
+ long startValueAfterAllocation = Long.MAX_VALUE;
+ reserveSlotsInBulkAndAssertValue(currentValueAfterAllocation, props.numAllocated);
+ assertExpectedStateInSystemSequence(props, startValueAfterAllocation);
+
+ // Try and get next value
+ try {
+ conn.createStatement().executeQuery(String.format(SELECT_NEXT_VALUE_SQL, "bulkalloc.alpha"));
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+ @Test
+ /**
+ * Validates that if our current or start value is > 0 and we ask for Long.MAX
+ * and overflow to the next value, the correct Exception is thrown when
+ * the expression is evaluated.
+ */
+ public void testNextValuesForSequenceOverflowAllocation() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100)
+ .numAllocated(Long.MAX_VALUE).build();
+
+ // Create Sequence
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + Long.MAX_VALUE
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+ }
+
+
+ @Test
+ /**
+ * Validates that specifying an bulk allocation less than the size of the cache defined on the sequence works
+ * as expected.
+ */
+ public void testNextValuesForSequenceAllocationLessThanCacheSize() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(50).minValue(100).maxValue(6000).build();
+
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(105);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 355;
+ int startValueAfterAllocation = 110;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertExpectedStateInSystemSequence(props, 600);
+ assertExpectedNumberOfValuesAllocated(startValueAfterAllocation, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+
+ // Assert standard Sequence Operations return expected values
+ // 105 + (50 * 5) = 355
+ assertExpectedCurrentValueForSequence(355);
+ assertExpectedNextValueForSequence(360);
+ assertExpectedNextValueForSequence(365);
+ assertExpectedNextValueForSequence(370);
+ }
+
+ @Test
+ /**
+ * Validates that specifying an bulk allocation less than the size of the cache defined on the sequence works
+ * as expected if we don't have enough values in the cache to support the allocation.
+ */
+ public void testNextValuesForInsufficentCacheValuesAllocationLessThanCacheSize() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(50).minValue(100).maxValue(6000).build();
+
+ nextConnection();
+ createSequenceWithMinMax(props);
+ nextConnection();
+
+ // Allocate 51 slots, only 49 will be left
+ int currentValueAfter51Allocations = 355; // 100 + 51 * 5
+ for (int i = 100; i <= currentValueAfter51Allocations; i = i + 5) {
+ assertExpectedNextValueForSequence(i);
+ }
+ assertExpectedCurrentValueForSequence(currentValueAfter51Allocations);
+
+ // Bulk Allocate 50 Sequence Slots which greater than amount left in cache
+ // This should throw away rest of the cache, and allocate the request slot
+ // from the next start value
+ int currentValueAfterAllocation = 845;
+ int startValueAfterAllocation = 600;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(845);
+ assertExpectedNextValueForSequence(850);
+ assertExpectedNextValueForSequence(855);
+ assertExpectedNextValueForSequence(860);
+ }
+
+ @Test
+ /**
+ * Validates that for NEXT <n> VALUES FOR is not supported on Sequences that have the
+ * CYCLE flag set to true.
+ */
+ public void testNextValuesForSequenceWithCycles() throws Exception {
+ final SequenceProperties sequenceProps =
+ new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100)
+ .numAllocated(1000).minValue(100).maxValue(900).build();
+
+ nextConnection();
+ createSequenceWithMinMaxAndCycle(sequenceProps);
+ nextConnection();
+
+ // Full first batch from the sequence
+ assertExpectedNextValueForSequence(100);
+ assertExpectedCurrentValueForSequence(100);
+ assertExpectedNextValueForSequence(105);
+
+ // Attempt to bulk Allocate more slots than available
+ try {
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + sequenceProps.numAllocated
+ + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1");
+ fail("Invoking SELECT NEXT VALUES should have failed as operation is not supported for sequences with Cycles.");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getNextException() == null);
+ }
+
+ // Assert sequence didn't advance
+ assertExpectedCurrentValueForSequence(105);
+ assertExpectedNextValueForSequence(110);
+ assertExpectedNextValueForSequence(115);
+ }
+
+ @Test
+ /**
+ * Validates that if we have multiple NEXT <n> VALUES FOR <seq> expression and the
+ * CURRENT VALUE FOR expression work correctly when used in the same statement.
+ */
+ public void testCurrentValueForAndNextValuesForExpressionsForSameSequence() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT CURRENT VALUE FOR bulkalloc.alpha, NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+ int currentValueFor = rs.getInt(1);
+ int nextValuesFor = rs.getInt(2);
+ assertEquals("Expected the next value to be first value reserved", startValueAfterAllocation, nextValuesFor);
+ assertEquals("Expected current value to be the same as next value", startValueAfterAllocation, currentValueFor);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+ @Test
+ /**
+ * Validates that if we have multiple NEXT <n> VALUES FOR <seq> expressions for the *same* sequence
+ * in a statement we only process the one which has the highest value of <n> and return the start
+ * value for that for all expressions.
+ */
+ public void testMultipleNextValuesForExpressionsForSameSequence() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots - One for 5 and one for 1000, 1000 should have precedence
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT 5 VALUES FOR bulkalloc.alpha, NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ int firstValue = rs.getInt(1);
+ int secondValue = rs.getInt(2);
+ assertEquals("Expected both expressions to return the same value", firstValue, secondValue);
+ assertEquals("Expected the value returned to be the highest allocation", startValueAfterAllocation, firstValue);
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+ @Test
+ /**
+ * Validates that if we have NEXT VALUE FOR <seq> and NEXT <n> VALUES FOR <seq> expressions for the *same* sequence
+ * in a statement we only process way and honor the value of the highest value of <n>, where for
+ * NEXT VALUE FOR <seq> is assumed to be 1.
+ */
+ public void testMultipleDifferentExpressionsForSameSequence() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Pull First Batch from Sequence
+ assertExpectedNextValueForSequence(1);
+
+ // Bulk Allocate Sequence Slots and Get Next Value in Same Statement
+ int currentValueAfterAllocation = 1100;
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ int startValueAfterAllocation = 101;
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT VALUE FOR bulkalloc.alpha, "
+ + "NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha, "
+ + "CURRENT VALUE FOR bulkalloc.alpha, "
+ + "NEXT 999 VALUES FOR bulkalloc.alpha "
+ + "FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation);
+
+ // Assert all values returned are the same
+ // Expect them to be the highest value from NEXT VALUE or NEXT <n> VALUES FOR
+ int previousVal = 0;
+ for (int i = 1; i <= 4; i++) {
+ int currentVal = rs.getInt(i);
+ if (i != 1) {
+ assertEquals(
+ "Expected all NEXT VALUE FOR and NEXT <n> VALUES FOR expressions to return the same value",
+ previousVal, currentVal);
+ }
+ previousVal = currentVal;
+ }
+
+ // Assert standard Sequence Operations return expected values
+ assertExpectedCurrentValueForSequence(currentValueAfterAllocation);
+ assertExpectedNextValueForSequence(nextValueAfterAllocation);
+ }
+
+
+ @Test
+ /**
+ * Validates that using NEXT <n> VALUES FOR on different sequences in the
+ * same statement with *different* values of <n> works as expected. This
+ * test validates that we keep our numAllocated array and sequence keys in
+ * sync during the sequence management process.
+ */
+ public void testMultipleNextValuesForExpressionsForDifferentSequences() throws Exception {
+
+ nextConnection();
+ conn.createStatement().execute("CREATE SEQUENCE bulkalloc.alpha START WITH 30 INCREMENT BY 3 CACHE 100");
+ conn.createStatement().execute("CREATE SEQUENCE bulkalloc.beta START WITH 100 INCREMENT BY 5 CACHE 50");
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots for Two Sequences
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT 100 VALUES FOR bulkalloc.alpha, NEXT 1000 VALUES FOR bulkalloc.beta FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertEquals(30, rs.getInt(1));
+ assertEquals(100, rs.getInt(2));
+
+ // Assert standard Sequence Operations return expected values
+ for (int i = 330; i < 330 + (2 * 100); i += 3) {
+ assertExpectedCurrentValueForSequence(i - 3, "bulkalloc.alpha");
+ assertExpectedNextValueForSequence(i, "bulkalloc.alpha");
+ }
+
+ for (int i = 5100; i < 5100 + (2 * 1000); i += 5) {
+ assertExpectedCurrentValueForSequence(i - 5, "bulkalloc.beta");
+ assertExpectedNextValueForSequence(i, "bulkalloc.beta");
+ }
+ }
+
+ @Test
+ /**
+ * Validates that calling NEXT <n> VALUES FOR with EXPLAIN PLAN doesn't use
+ * allocate any slots.
+ */
+ public void testExplainPlanValidatesSequences() throws Exception {
+
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(3).startsWith(30).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+
+ nextGenericConnection();
+ conn.createStatement().execute("CREATE TABLE bulkalloc.simpletbl (k BIGINT NOT NULL PRIMARY KEY)");
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ int startValueAfterAllocation = 30;
+ reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated);
+
+ // Execute EXPLAIN PLAN multiple times, which should not change Sequence values
+ for (int i = 0; i < 3; i++) {
+ conn.createStatement().executeQuery("EXPLAIN SELECT NEXT 1000 VALUES FOR bulkalloc.alpha FROM bulkalloc.simpletbl");
+ }
+
+ // Validate the current value was not advanced and was the starting value
+ assertExpectedStateInSystemSequence(props, 3030);
+
+ // Assert standard Sequence Operations return expected values
+ int startValue = 3030;
+ for (int i = startValue; i < startValue + (2 * props.cacheSize); i += props.incrementBy) {
+ assertExpectedCurrentValueForSequence(i - props.incrementBy);
+ assertExpectedNextValueForSequence(i);
+ }
+ }
+
+ @Test
+ public void testExplainPlanForNextValuesFor() throws Exception {
+
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(3).startsWith(30).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextGenericConnection();
+ conn.createStatement().execute("CREATE TABLE bulkalloc.simpletbl (k BIGINT NOT NULL PRIMARY KEY)");
+ nextConnection();
+
+ // Execute EXPLAIN PLAN which should not change Sequence values
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT NEXT 1000 VALUES FOR bulkalloc.alpha FROM bulkalloc.simpletbl");
+
+ // Assert output for Explain Plain result is as expected
+ assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER BULKALLOC.SIMPLETBL\n" +
+ " SERVER FILTER BY FIRST KEY ONLY\n" +
+ "CLIENT RESERVE VALUES FROM 1 SEQUENCE", QueryUtil.getExplainPlan(rs));
+ }
+
+
+ /**
+ * Performs a multithreaded test whereby we interleave reads from the result set of
+ * NEXT VALUE FOR and NEXT <n> VALUES FOR to make sure we get expected values with the
+ * following order of execution:
+ *
+ * 1) Execute expression NEXT <n> VALUES FOR <seq>
+ * 2) Execute expression NEXT VALUE FOR <seq>
+ * 3) Read back value from expression NEXT VALUE FOR <seq> via rs.next()
+ * 4) Read back value from expression NEXT <n> VALUES FOR <seq> via rs.next()
+ */
+ public void testNextValuesForMixedWithNextValueForMultiThreaded() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ final long startValueAfterAllocation1 = 101;
+ final long startValueAfterAllocation2 = 1101;
+ final long numSlotToAllocate = props.numAllocated;
+
+ // Setup and run tasks in independent Threads
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ Callable<Long> task1 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ latch1.countDown(); // Allows NEXT VALUE FOR thread to proceed
+ latch2.await(); // Waits until NEXT VALUE FOR thread reads and increments currentValue
+ rs.next();
+ return rs.getLong(1);
+ }
+
+ };
+
+ Callable<Long> task2 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ latch1.await(); // Wait for execution of NEXT <n> VALUES FOR expression
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT VALUE FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ long retVal = rs.getLong(1);
+ latch2.countDown(); // Allow NEXT <n> VALUES for thread to completed
+ return retVal;
+ }
+
+ };
+
+ @SuppressWarnings("unchecked")
+ List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 20, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation1, futures.get(0).get(10, TimeUnit.SECONDS).longValue());
+ assertEquals(startValueAfterAllocation2, futures.get(1).get(10, TimeUnit.SECONDS).longValue());
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ @Test
+ public void testMultipleNextValuesWithDiffAllocsForMultiThreaded() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ assertExpectedNextValueForSequence(1);
+ assertExpectedCurrentValueForSequence(1);
+ assertExpectedNextValueForSequence(2);
+
+ // Bulk Allocate Sequence Slots
+ final long startValueAfterAllocation1 = 101;
+ final long startValueAfterAllocation2 = 1101;
+ final long numSlotToAllocate1 = 1000;
+ final long numSlotToAllocate2 = 100;
+
+ // Setup and run tasks in independent Threads
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ Callable<Long> task1 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate1 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ latch1.countDown(); // Allows other thread to proceed
+ latch2.await();
+ return rs.getLong(1);
+ }
+
+ };
+
+ Callable<Long> task2 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ latch1.await(); // Wait for other thread to execut of NEXT <n> VALUES FOR expression
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate2 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ long retVal = rs.getLong(1);
+ latch2.countDown(); // Allow thread to completed
+ return retVal;
+ }
+
+ };
+
+ @SuppressWarnings("unchecked")
+ List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 5, TimeUnit.SECONDS);
+
+ // Retrieve value from Thread running NEXT <n> VALUES FOR
+ Long retValue1 = futures.get(0).get(5, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation1, retValue1.longValue());
+
+ // Retrieve value from Thread running NEXT VALUE FOR
+ Long retValue2 = futures.get(1).get(5, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation2, retValue2.longValue());
+
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ @Test
+ public void testMultipleNextValuesWithSameAllocsForMultiThreaded() throws Exception {
+ final SequenceProperties props =
+ new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100)
+ .numAllocated(1000).build();
+
+ nextConnection();
+ createSequenceWithNoMinMax(props);
+ nextConnection();
+
+ // Bulk Allocate Sequence Slots
+ final long startValueAfterAllocation1 = 1;
+ final long startValueAfterAllocation2 = 1001;
+ final long numSlotToAllocate1 = 1000;
+ final long numSlotToAllocate2 = 1000;
+
+ // Setup and run tasks in independent Threads
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ Callable<Long> task1 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate1 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ latch1.countDown(); // Allows other thread to proceed
+ latch2.await();
+ rs.next();
+ return rs.getLong(1);
+ }
+
+ };
+
+ Callable<Long> task2 = new Callable<Long>() {
+
+ @Override
+ public Long call() throws Exception {
+ latch1.await(); // Wait for other thread to execut of NEXT <n> VALUES FOR expression
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate2 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ rs.next();
+ long retVal = rs.getLong(1);
+ latch2.countDown(); // Allow thread to completed
+ return retVal;
+ }
+
+ };
+
+ // Because of the way the threads are interleaved the ranges used by each thread will the reserve
+ // of the order to statement execution
+ @SuppressWarnings("unchecked")
+ List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 5, TimeUnit.SECONDS);
+ assertEquals(startValueAfterAllocation2, futures.get(0).get(5, TimeUnit.SECONDS).longValue());
+ assertEquals(startValueAfterAllocation1, futures.get(1).get(5, TimeUnit.SECONDS).longValue());
+
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ // -----------------------------------------------------------------
+ // Private Helper Methods
+ // -----------------------------------------------------------------
+ private void assertBulkAllocationSucceeded(SequenceProperties props,
+ int currentValueAfterAllocation, int startValueAfterAllocation) throws SQLException {
+ int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy;
+ assertExpectedStateInSystemSequence(props, nextValueAfterAllocation);
+ assertExpectedNumberOfValuesAllocated(startValueAfterAllocation, currentValueAfterAllocation, props.incrementBy, props.numAllocated);
+ }
+
+
+ private void createSequenceWithNoMinMax(final SequenceProperties props) throws SQLException {
+ conn.createStatement().execute(
+ String.format(CREATE_SEQUENCE_NO_MIN_MAX_TEMPLATE, props.startsWith,
+ props.incrementBy, props.cacheSize));
+ }
+
+ private void createSequenceWithMinMax(final SequenceProperties props) throws SQLException {
+ conn.createStatement().execute(
+ String.format(CREATE_SEQUENCE_WITH_MIN_MAX_TEMPLATE, props.startsWith,
+ props.incrementBy, props.minValue, props.maxValue, props.cacheSize));
+ }
+
+ private void createSequenceWithMinMaxAndCycle(final SequenceProperties props) throws SQLException {
+ conn.createStatement().execute(
+ String.format(CREATE_SEQUENCE_WITH_MIN_MAX_AND_CYCLE_TEMPLATE, props.startsWith,
+ props.incrementBy, props.minValue, props.maxValue, props.cacheSize));
+ }
+
+ private void reserveSlotsInBulkAndAssertValue(long expectedValue, long numSlotToAllocate)
+ throws SQLException {
+ ResultSet rs =
+ conn.createStatement().executeQuery(
+ "SELECT NEXT " + numSlotToAllocate + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertEquals(expectedValue, rs.getInt(1));
+ }
+
+ private void reserveSlotsInBulkUsingBindsAndAssertValue(int expectedValue, long numSlotToAllocate)
+ throws SQLException {
+ PreparedStatement ps = conn.prepareStatement("SELECT NEXT ? VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\"");
+ ps.setLong(1, numSlotToAllocate);
+ ResultSet rs = ps.executeQuery();
+ assertTrue(rs.next());
+ int retValue = rs.getInt(1);
+ assertEquals(expectedValue, retValue);
+ }
+
+ private void assertExpectedCurrentValueForSequence(int expectedValue) throws SQLException {
+ assertExpectedCurrentValueForSequence(expectedValue, "bulkalloc.alpha");
+ }
+
+ private void assertExpectedCurrentValueForSequence(int expectedValue, String sequenceName) throws SQLException {
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery(String.format(SELECT_CURRENT_VALUE_SQL, sequenceName));
+ assertTrue(rs.next());
+ assertEquals(expectedValue, rs.getInt(1));
+ }
+
+ private void assertExpectedNextValueForSequence(int expectedValue) throws SQLException {
+ assertExpectedNextValueForSequence(expectedValue, "bulkalloc.alpha");
+ }
+
+ private void assertExpectedNextValueForSequence(int expectedValue, String sequenceName) throws SQLException {
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery(String.format(SELECT_NEXT_VALUE_SQL, sequenceName));
+ assertTrue(rs.next());
+ assertEquals(expectedValue, rs.getInt(1));
+ }
+
+
+ /**
+ * Returns a non-tenant specific connection.
+ */
+ private void nextGenericConnection() throws Exception {
+ if (conn != null) conn.close();
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ conn = DriverManager.getConnection(getUrl(), props);
+ }
+
+ private void nextConnection() throws Exception {
+ if (conn != null) conn.close();
+ long ts = nextTimestamp();
+ if (tenantId != null) {
+ // Create tenant specific connection
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(nextTimestamp()));
+ this.conn = DriverManager.getConnection(getUrl() + ';' + TENANT_ID_ATTRIB + '=' + "tenant1", props);
+
+ } else {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ conn = DriverManager.getConnection(getUrl(), props);
+ }
+ }
+
+ private void assertExpectedStateInSystemSequence(SequenceProperties props, long currentValue)
+ throws SQLException {
+ // Validate state in System.Sequence
+ ResultSet rs =
+ conn.createStatement()
+ .executeQuery(
+ "SELECT start_with, current_value, increment_by, cache_size, min_value, max_value, cycle_flag, sequence_schema, sequence_name FROM SYSTEM.\"SEQUENCE\"");
+ assertTrue(rs.next());
+ assertEquals(props.startsWith, rs.getLong("start_with"));
+ assertEquals(props.incrementBy, rs.getLong("increment_by"));
+ assertEquals(props.cacheSize, rs.getLong("cache_size"));
+ assertEquals(false, rs.getBoolean("cycle_flag"));
+ assertEquals("BULKALLOC", rs.getString("sequence_schema"));
+ assertEquals("ALPHA", rs.getString("sequence_name"));
+ assertEquals(currentValue, rs.getLong("current_value"));
+ assertEquals(props.minValue, rs.getLong("min_value"));
+ assertEquals(props.maxValue, rs.getLong("max_value"));
+ assertFalse(rs.next());
+ }
+
+ private void assertExpectedNumberOfValuesAllocated(long firstValue, long lastValue,
+ int incrementBy, long numAllocated) {
+ int cnt = 0;
+ for (long i = firstValue; (incrementBy > 0 ? i <= lastValue : i >= lastValue); i += incrementBy) {
+ cnt++;
+ }
+ assertEquals("Incorrect number of values allocated: " + cnt, numAllocated, cnt);
+ }
+
+ private static class SequenceProperties {
+
+ private final long numAllocated;
+ private final int incrementBy;
+ private final int startsWith;
+ private final int cacheSize;
+ private final long minValue;
+ private final long maxValue;
+
+ public SequenceProperties(Builder builder) {
+ this.numAllocated = builder.numAllocated;
+ this.incrementBy = builder.incrementBy;
+ this.startsWith = builder.startsWith;
+ this.cacheSize = builder.cacheSize;
+ this.minValue = builder.minValue;
+ this.maxValue = builder.maxValue;
+ }
+
+ private static class Builder {
+
+ long maxValue = Long.MAX_VALUE;
+ long minValue = Long.MIN_VALUE;
+ long numAllocated = 100;
+ int incrementBy = 1;
+ int startsWith = 1;
+ int cacheSize = 100;
+
+ public Builder numAllocated(long numAllocated) {
+ this.numAllocated = numAllocated;
+ return this;
+ }
+
+ public Builder startsWith(int startsWith) {
+ this.startsWith = startsWith;
+ return this;
+ }
+
+ public Builder cacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ return this;
+ }
+
+ public Builder incrementBy(int incrementBy) {
+ this.incrementBy = incrementBy;
+ return this;
+ }
+
+ public Builder minValue(long minValue) {
+ this.minValue = minValue;
+ return this;
+ }
+
+ public Builder maxValue(long maxValue) {
+ this.maxValue = maxValue;
+ return this;
+ }
+
+ public SequenceProperties build() {
+ return new SequenceProperties(this);
+ }
+
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index ca5e778..10fda68 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -903,7 +903,10 @@ term returns [ParseNode ret]
}
| (n=NEXT | CURRENT) VALUE FOR s=from_table_name
{ contextStack.peek().hasSequences(true);
- $ret = n==null ? factory.currentValueFor(s) : factory.nextValueFor(s); }
+ $ret = n==null ? factory.currentValueFor(s) : factory.nextValueFor(s, null); }
+ | (n=NEXT) lorb=literal_or_bind VALUES FOR s=from_table_name
+ { contextStack.peek().hasSequences(true);
+ $ret = factory.nextValueFor(s, lorb); }
;
one_or_more_expressions returns [List<ParseNode> ret]
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index ede6d9c..5ec8cd2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -24,16 +24,23 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SequenceValueParseNode;
import org.apache.phoenix.parse.SequenceValueParseNode.Op;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.tuple.DelegateTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.SequenceUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -41,7 +48,7 @@ import com.google.common.collect.Maps;
public class SequenceManager {
private final PhoenixStatement statement;
private int[] sequencePosition;
- private List<SequenceKey> nextSequences;
+ private List<SequenceAllocation> nextSequences;
private List<SequenceKey> currentSequences;
private final Map<SequenceKey,SequenceValueExpression> sequenceMap = Maps.newHashMap();
private final BitSet isNextSequence = new BitSet();
@@ -113,20 +120,30 @@ public class SequenceManager {
}
}
- public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) {
+ public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) throws SQLException {
PName tenantName = statement.getConnection().getTenantId();
String tenantId = tenantName == null ? null : tenantName.getString();
TableName tableName = node.getTableName();
int nSaltBuckets = statement.getConnection().getQueryServices().getSequenceSaltBuckets();
+ ParseNode numToAllocateNode = node.getNumToAllocateNode();
+
+ long numToAllocate = determineNumToAllocate(tableName, numToAllocateNode);
SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets);
SequenceValueExpression expression = sequenceMap.get(key);
if (expression == null) {
int index = sequenceMap.size();
- expression = new SequenceValueExpression(key, node.getOp(), index);
+ expression = new SequenceValueExpression(key, node.getOp(), index, numToAllocate);
sequenceMap.put(key, expression);
- } else if (expression.op != node.getOp()){
- expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex());
- }
+ } else if (expression.op != node.getOp() || expression.getNumToAllocate() < numToAllocate) {
+ // Keep the maximum allocation size we see in a statement
+ SequenceValueExpression oldExpression = expression;
+ expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex(), Math.max(expression.getNumToAllocate(), numToAllocate));
+ if (oldExpression.getNumToAllocate() < numToAllocate) {
+ // If we found a NEXT VALUE expression with a higher number to allocate
+ // We override the original expression
+ sequenceMap.put(key, expression);
+ }
+ }
// If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT
if (node.getOp() == Op.NEXT_VALUE) {
isNextSequence.set(expression.getIndex());
@@ -134,6 +151,38 @@ public class SequenceManager {
return expression;
}
+
+ /**
+ * If caller specified used NEXT <n> VALUES FOR <seq> expression then we have set the numToAllocate.
+ * If numToAllocate is > 1 we treat this as a bulk reservation of a block of sequence slots.
+ *
+ * @throws a SQLException if we can't compile the expression
+ */
+ private long determineNumToAllocate(TableName sequenceName, ParseNode numToAllocateNode)
+ throws SQLException {
+
+ if (numToAllocateNode != null) {
+ final StatementContext context = new StatementContext(statement);
+ ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+ Expression expression = numToAllocateNode.accept(expressionCompiler);
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ expression.evaluate(null, ptr);
+ if (ptr.getLength() == 0 || !expression.getDataType().isCoercibleTo(PLong.INSTANCE)) {
+ throw SequenceUtil.getException(sequenceName.getSchemaName(), sequenceName.getTableName(), SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT);
+ }
+
+ // Parse <n> and make sure it is greater than 0. We don't support allocating 0 or negative values!
+ long numToAllocate = (long) PLong.INSTANCE.toObject(ptr, expression.getDataType());
+ if (numToAllocate < 1) {
+ throw SequenceUtil.getException(sequenceName.getSchemaName(), sequenceName.getTableName(), SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT);
+ }
+ return numToAllocate;
+
+ } else {
+ // Standard Sequence Allocation Behavior
+ return SequenceUtil.DEFAULT_NUM_SLOTS_TO_ALLOCATE;
+ }
+ }
public void validateSequences(Sequence.ValueOp action) throws SQLException {
if (sequenceMap.isEmpty()) {
@@ -146,14 +195,17 @@ public class SequenceManager {
currentSequences = Lists.newArrayListWithExpectedSize(maxSize);
for (Map.Entry<SequenceKey, SequenceValueExpression> entry : sequenceMap.entrySet()) {
if (isNextSequence.get(entry.getValue().getIndex())) {
- nextSequences.add(entry.getKey());
+ nextSequences.add(new SequenceAllocation(entry.getKey(), entry.getValue().getNumToAllocate()));
} else {
currentSequences.add(entry.getKey());
}
}
long[] srcSequenceValues = new long[nextSequences.size()];
SQLException[] sqlExceptions = new SQLException[nextSequences.size()];
+
+ // Sort the next sequences to prevent deadlocks
Collections.sort(nextSequences);
+
// Create reverse indexes
for (int i = 0; i < nextSequences.size(); i++) {
sequencePosition[i] = sequenceMap.get(nextSequences.get(i)).getIndex();
@@ -168,4 +220,6 @@ public class SequenceManager {
services.validateSequences(nextSequences, timestamp, srcSequenceValues, sqlExceptions, action);
setSequenceValues(srcSequenceValues, dstSequenceValues, sqlExceptions);
}
-}
+
+}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
index cdaae68..71e2d02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java
@@ -32,13 +32,23 @@ public class SequenceValueExpression extends BaseTerminalExpression {
private final SequenceKey key;
final Op op;
private final int index;
+ private final long numToAllocate;
- public SequenceValueExpression(SequenceKey key, Op op, int index) {
+ public SequenceValueExpression(SequenceKey key, Op op, int index, long numToAllocate) {
this.key = key;
this.op = op;
this.index = index;
+ this.numToAllocate = numToAllocate;
}
+ public long getNumToAllocate() {
+ return numToAllocate;
+ }
+
+ public SequenceKey getKey() {
+ return key;
+ }
+
public int getIndex() {
return index;
}
@@ -73,7 +83,7 @@ public class SequenceValueExpression extends BaseTerminalExpression {
@Override
public String toString() {
- return op.getName() + " VALUE FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName());
+ return op.getName() + (numToAllocate == 1 ? " VALUE " : (" " + numToAllocate + " VALUES " )) + "FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName());
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 9b5f040..bb6c4b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -41,17 +41,17 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PBoolean;
-import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.schema.Sequence;
-import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SequenceUtil;
@@ -77,6 +77,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION";
public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE";
public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE";
+ public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE";
private static final byte[] SUCCESS_VALUE = PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS));
private static Result getErrorResult(byte[] row, long timestamp, int errorCode) {
@@ -138,10 +139,8 @@ public class SequenceRegionObserver extends BaseRegionObserver {
if (result.isEmpty()) {
return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
}
- if (validateOnly) {
- return result;
- }
+
KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
KeyValue incrementByKV = Sequence.getIncrementByKV(result);
KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
@@ -224,6 +223,28 @@ public class SequenceRegionObserver extends BaseRegionObserver {
cycleKV.getValueOffset(), cycleKV.getValueLength());
}
+ long numSlotsToAllocate = calculateNumSlotsToAllocate(increment);
+
+ // We don't support Bulk Allocations on sequences that have the CYCLE flag set to true
+ if (cycle && !SequenceUtil.isCycleAllowed(numSlotsToAllocate)) {
+ return getErrorResult(row, maxTimestamp, SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED.getErrorCode());
+ }
+
+ // Bulk Allocations are expressed by NEXT <n> VALUES FOR
+ if (SequenceUtil.isBulkAllocation(numSlotsToAllocate)) {
+ if (SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate)) {
+ // If we try to allocate more slots than the limit we return an error.
+ // Allocating sequence values in bulk should be an all or nothing operation.
+ // If the operation succeeds clients are guaranteed that they have reserved
+ // all the slots requested.
+ return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode());
+ }
+ }
+
+ if (validateOnly) {
+ return result;
+ }
+
// return if we have run out of sequence values
if (limitReached) {
if (cycle) {
@@ -231,16 +252,15 @@ public class SequenceRegionObserver extends BaseRegionObserver {
currentValue = increasingSeq ? minValue : maxValue;
}
else {
- SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
- : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
- return getErrorResult(row, maxTimestamp, code.getErrorCode());
+ return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode());
}
}
-
+
// check if the limit was reached
- limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize);
- // update currentValue
- currentValue += incrementBy * cacheSize;
+ limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate);
+
+ // update currentValue
+ currentValue += incrementBy * (SequenceUtil.isBulkAllocation(numSlotsToAllocate) ? numSlotsToAllocate : cacheSize);
// update the currentValue of the Result row
KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
Sequence.replaceCurrentValueKV(cells, newCurrentValueKV);
@@ -264,6 +284,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
region.closeRegionOperation();
}
}
+
/**
* Creates a new KeyValue for a long value
@@ -417,5 +438,20 @@ public class SequenceRegionObserver extends BaseRegionObserver {
region.closeRegionOperation();
}
}
+
+ /**
+ * Determines whether a request for incrementing the sequence was a bulk allocation and if so
+ * what the number of slots to allocate is. This is triggered by the NEXT <n> VALUES FOR expression.
+ * For backwards compatibility with older clients, we default the value to 1 which preserves
+ * existing behavior when invoking NEXT VALUE FOR.
+ */
+ private long calculateNumSlotsToAllocate(final Increment increment) {
+ long numToAllocate = 1;
+ byte[] numToAllocateBytes = increment.getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE);
+ if (numToAllocateBytes != null) {
+ numToAllocate = Bytes.toLong(numToAllocateBytes);
+ }
+ return numToAllocate;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index cc8b02a..b9f81fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -276,6 +276,8 @@ public enum SQLExceptionCode {
SEQUENCE_VAL_REACHED_MAX_VALUE(1212, "42Z12", "Reached MAXVALUE of sequence"),
SEQUENCE_VAL_REACHED_MIN_VALUE(1213, "42Z13", "Reached MINVALUE of sequence"),
INCREMENT_BY_MUST_NOT_BE_ZERO(1214, "42Z14", "Sequence INCREMENT BY value cannot be zero"),
+ NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT(1215, "42Z15", "Sequence NEXT n VALUES FOR must be a postive integer or constant." ),
+ NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED(1216, "42Z16", "Sequence NEXT n VALUES FOR is not supported for Sequences with the CYCLE flag" ),
/** Parser error. (errorcode 06, sqlState 42P) */
PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYTAX_ERROR),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f323ec4..9689589 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -232,7 +232,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
}
protected QueryPlan optimizeQuery(CompilableStatement stmt) throws SQLException {
- QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE);
return connection.getQueryServices().getOptimizer().optimize(this, plan);
}
@@ -245,7 +245,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
public PhoenixResultSet call() throws SQLException {
final long startTime = System.currentTimeMillis();
try {
- QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
plan = connection.getQueryServices().getOptimizer().optimize(
PhoenixStatement.this, plan);
// this will create its own trace internally, so we don't wrap this
@@ -303,7 +303,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
// since they'd update data directly from coprocessors, and should thus operate on
// the latest state
try {
- MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
MutationState state = plan.execute();
connection.getMutationState().join(state);
if (connection.getAutoCommit()) {
@@ -1203,14 +1203,14 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
if (stmt.getOperation().isMutation()) {
throw new ExecuteQueryNotApplicableException(query);
}
- return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ return stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE);
}
public MutationPlan compileMutation(CompilableStatement stmt, String query) throws SQLException {
if (!stmt.getOperation().isMutation()) {
throw new ExecuteUpdateNotApplicableException(query);
}
- return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE);
+ return stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE);
}
public MutationPlan compileMutation(String sql) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 44359a7..49b14c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -310,11 +310,11 @@ public class ParseNodeFactory {
}
public SequenceValueParseNode currentValueFor(TableName tableName) {
- return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.CURRENT_VALUE);
+ return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.CURRENT_VALUE, null);
}
- public SequenceValueParseNode nextValueFor(TableName tableName) {
- return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.NEXT_VALUE);
+ public SequenceValueParseNode nextValueFor(TableName tableName, ParseNode numToAllocateNode) {
+ return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.NEXT_VALUE, numToAllocateNode);
}
public AddColumnStatement addColumn(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, ListMultimap<String,Pair<String,Object>> props) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
index a5d60fe..1fc670c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
@@ -39,10 +39,16 @@ public class SequenceValueParseNode extends TerminalParseNode {
}
private final TableName tableName;
private final Op op;
+ private final ParseNode numToAllocate;
- public SequenceValueParseNode(TableName tableName, Op op) {
+ public SequenceValueParseNode(TableName tableName, Op op, ParseNode numToAllocate) {
this.tableName = tableName;
this.op = op;
+ this.numToAllocate = numToAllocate;
+ }
+
+ public ParseNode getNumToAllocateNode() {
+ return numToAllocate;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index dc51b10..3b53309 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.stats.PTableStats;
@@ -91,8 +92,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
long createSequence(String tenantId, String schemaName, String sequenceName, long startWith, long incrementBy, long cacheSize, long minValue, long maxValue, boolean cycle, long timestamp) throws SQLException;
long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException;
- void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException;
- void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException;
+ void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException;
+ void incrementSequences(List<SequenceAllocation> sequenceAllocation, long timestamp, long[] values, SQLException[] exceptions) throws SQLException;
long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException;
void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) throws SQLException;