You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/17 09:50:40 UTC
[1/5] git commit: PHOENIX-1357 Salt sequence table to prevent same RS
from getting all sequence calls
Repository: phoenix
Updated Branches:
refs/heads/3.0 81c300cb8 -> 3f254dc4d
PHOENIX-1357 Salt sequence table to prevent same RS from getting all sequence calls
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2c884937
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2c884937
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2c884937
Branch: refs/heads/3.0
Commit: 2c88493763d5f2b2cc8857d6ad26208ddd8c7e74
Parents: 9833f07
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Oct 15 22:12:15 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Oct 16 20:58:17 2014 -0700
----------------------------------------------------------------------
.../phoenix/coprocessor/MetaDataProtocol.java | 5 +-
.../query/ConnectionQueryServicesImpl.java | 26 +--
.../apache/phoenix/query/QueryConstants.java | 4 +-
.../org/apache/phoenix/schema/Sequence.java | 9 +-
.../org/apache/phoenix/schema/SequenceKey.java | 49 +++---
.../org/apache/phoenix/util/SchemaUtil.java | 8 -
.../org/apache/phoenix/util/UpgradeUtil.java | 167 +++++++++++++++++++
7 files changed, 214 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c884937/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 606243d..3944d9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -62,9 +62,8 @@ public interface MetaDataProtocol extends CoprocessorProtocol {
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
public static final long MIN_TABLE_TIMESTAMP = 0;
- // Incremented with the addition of INDEX_TYPE to SYSTEM.CATALOG (though it's unused in 3.0)
- // plus the addition of MIN_VALUE, MAX_VALUE, and CYCLE to SYSTEM.SEQUENCE.
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 3;
+ // Incremented from 3 to 4 to salt the sequence table in 3.2/4.2
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 4;
public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
// TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c884937/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 3a03401..f4d5699 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -17,10 +17,6 @@
*/
package org.apache.phoenix.query;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
@@ -115,6 +111,7 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1328,13 +1325,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
} catch (TableAlreadyExistsException ignore) {
- // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include
- // any new columns we've added.
- metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
- PhoenixDatabaseMetaData.INDEX_TYPE + " " + PDataType.UNSIGNED_TINYINT.getSqlTypeName() +
- ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PDataType.LONG.getSqlTypeName());
}
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
@@ -1344,13 +1334,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (TableAlreadyExistsException ignore) {
// This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
// any new columns we've added.
- String newColumns =
- MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", "
- + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", "
- + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", "
- + LIMIT_REACHED_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName();
- metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
- MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns);
+ if (UpgradeUtil.addSaltByteToSequenceTable(metaConnection)) {
+ metaConnection.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ PhoenixDatabaseMetaData.TYPE_SEQUENCE,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+ clearCache();
+ }
}
try {
metaConnection.createStatement().executeUpdate(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c884937/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index b26b426..d2d25fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -99,6 +99,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.util.ByteUtil;
@@ -266,6 +267,7 @@ public interface QueryConstants {
CYCLE_FLAG + " BOOLEAN, \n" +
LIMIT_REACHED_FLAG + " BOOLEAN \n" +
" CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
- HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n";
+ HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
+ "SALT_BUCKETS=" + SaltingUtil.MAX_BUCKET_NUM + "\n";
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c884937/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index e992126..c9603e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -49,7 +49,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.SequenceUtil;
import com.google.common.collect.Lists;
@@ -216,7 +215,7 @@ public class Sequence {
}
private Append newReturn(SequenceValue value) {
- byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), this.key.getSchemaName(), this.key.getSequenceName());
+ byte[] key = this.key.getKey();
Append append = new Append(key);
byte[] opBuf = new byte[] {(byte)MetaOp.RETURN_SEQUENCE.ordinal()};
append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, opBuf);
@@ -273,7 +272,7 @@ public class Sequence {
}
public Increment newIncrement(long timestamp, Sequence.ValueOp action) {
- Increment inc = new Increment(SchemaUtil.getSequenceKey(key.getTenantId(), key.getSchemaName(), key.getSequenceName()));
+ Increment inc = new Increment(key.getKey());
// It doesn't matter what we set the amount too - we always use the values we get
// from the Get we do to prevent any race conditions. All columns that get added
// are returned with their current value
@@ -465,7 +464,7 @@ public class Sequence {
}
public Append createSequence(long startWith, long incrementBy, long cacheSize, long timestamp, long minValue, long maxValue, boolean cycle) {
- byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), this.key.getSchemaName(), this.key.getSequenceName());
+ byte[] key = this.key.getKey();
Append append = new Append(key);
append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new byte[] {(byte)MetaOp.CREATE_SEQUENCE.ordinal()});
if (timestamp != HConstants.LATEST_TIMESTAMP) {
@@ -505,7 +504,7 @@ public class Sequence {
}
public Append dropSequence(long timestamp) {
- byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), this.key.getSchemaName(), this.key.getSequenceName());
+ byte[] key = this.key.getKey();
Append append = new Append(key);
append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new byte[] {(byte)MetaOp.DROP_SEQUENCE.ordinal()});
if (timestamp != HConstants.LATEST_TIMESTAMP) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c884937/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
index 644fc4a..c25e438 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
@@ -17,37 +17,29 @@
*/
package org.apache.phoenix.schema;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
-public class SequenceKey implements Comparable<SequenceKey> {
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((tenantId == null) ? 0 : tenantId.hashCode());
- result = prime * result + ((schemaName == null) ? 0 : schemaName.hashCode());
- result = prime * result + sequenceName.hashCode();
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- SequenceKey other = (SequenceKey)obj;
- return this.compareTo(other) == 0;
- }
+public class SequenceKey implements Comparable<SequenceKey> {
private final String tenantId;
private final String schemaName;
private final String sequenceName;
+ private final byte[] key;
public SequenceKey(String tenantId, String schemaName, String sequenceName) {
this.tenantId = tenantId;
this.schemaName = schemaName;
this.sequenceName = sequenceName;
+ this.key = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(tenantId), QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(sequenceName));
+ key[0] = SaltingUtil.getSaltingByte(key, SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES, SaltingUtil.MAX_BUCKET_NUM);
}
+ public byte[] getKey() {
+ return key;
+
+ }
public String getTenantId() {
return tenantId;
}
@@ -71,4 +63,23 @@ public class SequenceKey implements Comparable<SequenceKey> {
}
return c;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((tenantId == null) ? 0 : tenantId.hashCode());
+ result = prime * result + ((schemaName == null) ? 0 : schemaName.hashCode());
+ result = prime * result + sequenceName.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ SequenceKey other = (SequenceKey)obj;
+ return this.compareTo(other) == 0;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c884937/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 80d0348..b5837de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -163,14 +163,6 @@ public class SchemaUtil {
return l3;
}
- public static byte[] getSequenceKey(byte[] tenantId, byte[] schemaName, byte[] sequenceName) {
- return getTableKey(tenantId, schemaName, sequenceName);
- }
-
- public static byte[] getSequenceKey(String tenantId, String schemaName, String sequenceName) {
- return getTableKey(tenantId, schemaName, sequenceName);
- }
-
/**
* Get the key used in the Phoenix metadata row for a table definition
* @param schemaName
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c884937/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
new file mode 100644
index 0000000..4c8a369
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class UpgradeUtil {
+ private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class);
+
+ private UpgradeUtil() {
+ }
+
+ public static boolean addSaltByteToSequenceTable(PhoenixConnection conn) throws SQLException {
+ logger.info("Upgrading SYSTEM.SEQUENCE table");
+
+ HTableInterface sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ try {
+ byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE);
+ logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM);
+ KeyValue saltKV = KeyValueUtil.newKeyValue(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+ PDataType.INTEGER.toBytes(SaltingUtil.MAX_BUCKET_NUM));
+ Put put = new Put(seqTableKey);
+ put.add(saltKV);
+ // Prevent multiple clients from doing this upgrade
+ if (!sysTable.checkAndPut(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, put)) {
+
+ logger.info("SYSTEM.SEQUENCE table has already been upgraded");
+ return false;
+ }
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ sysTable.close();
+ } catch (IOException e) {
+ logger.warn("Exception during close",e);
+ }
+ }
+ int batchSizeBytes = 100 * 1024; // 100K chunks
+ int sizeBytes = 0;
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
+
+ boolean success = false;
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
+ HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ try {
+ logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+ ResultScanner scanner = seqTable.getScanner(scan);
+ try {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ for (KeyValue keyValue : result.raw()) {
+ KeyValue newKeyValue = addSaltByte(keyValue);
+ sizeBytes += newKeyValue.getLength();
+ if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
+ // Delete old value
+ byte[] buf = keyValue.getBuffer();
+ Delete delete = new Delete(keyValue.getRow());
+ KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
+ buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
+ buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
+ keyValue.getTimestamp(), KeyValue.Type.Delete,
+ ByteUtil.EMPTY_BYTE_ARRAY,0,0);
+ delete.addDeleteMarker(deleteKeyValue);
+ mutations.add(delete);
+ sizeBytes += deleteKeyValue.getLength();
+ // Put new value
+ Put put = new Put(newKeyValue.getRow());
+ put.add(newKeyValue);
+ mutations.add(put);
+ } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
+ // Copy delete marker using new key so that it continues
+ // to delete the key value preceding it that will be updated
+ // as well.
+ Delete delete = new Delete(newKeyValue.getRow());
+ delete.addDeleteMarker(newKeyValue);
+ mutations.add(delete);
+ }
+ if (sizeBytes >= batchSizeBytes) {
+ logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
+ seqTable.batch(mutations);
+ mutations.clear();
+ sizeBytes = 0;
+ }
+ }
+ }
+ if (!mutations.isEmpty()) {
+ logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
+ seqTable.batch(mutations);
+ }
+ logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
+ success = true;
+ return true;
+ } catch (InterruptedException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+ scanner.close();
+ }
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ seqTable.close();
+ } catch (IOException e) {
+ logger.warn("Exception during close",e);
+ }
+ }
+ }
+
+ private static KeyValue addSaltByte(KeyValue keyValue) {
+ int length = keyValue.getRowLength();
+ int offset = keyValue.getRowOffset();
+ byte[] buf = keyValue.getBuffer();
+ byte[] newBuf = new byte[length + 1];
+ System.arraycopy(buf, offset, newBuf, SaltingUtil.NUM_SALTING_BYTES, length);
+ newBuf[0] = SaltingUtil.getSaltingByte(newBuf, SaltingUtil.NUM_SALTING_BYTES, length, SaltingUtil.MAX_BUCKET_NUM);
+ return new KeyValue(newBuf, 0, newBuf.length,
+ buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
+ buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
+ keyValue.getTimestamp(), KeyValue.Type.codeToType(keyValue.getType()),
+ buf, keyValue.getValueOffset(), keyValue.getValueLength());
+ }
+
+}
[4/5] git commit: PHOENIX-1361 Sequence value goes backwards if
sequence validated before reserved
Posted by ja...@apache.org.
PHOENIX-1361 Sequence value goes backwards if sequence validated before reserved
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e33c99d1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e33c99d1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e33c99d1
Branch: refs/heads/3.0
Commit: e33c99d1f24b9c93f383c8733c40573d7b005287
Parents: 2c88493
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Oct 16 23:04:36 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 17 00:52:57 2014 -0700
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/SequenceIT.java | 41 +++++++
.../apache/phoenix/compile/SequenceManager.java | 28 +++--
.../coprocessor/MetaDataEndpointImpl.java | 3 -
.../phoenix/expression/DelegateExpression.java | 108 +++++++++++++++++++
.../phoenix/parse/SequenceValueParseNode.java | 14 ++-
.../query/ConnectionQueryServicesImpl.java | 12 +--
.../org/apache/phoenix/schema/Sequence.java | 32 +++---
7 files changed, 203 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e33c99d1/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
index 690b6c5..d610633 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -1205,4 +1206,44 @@ public class SequenceIT extends BaseClientManagedTimeIT {
+ unexpectedExceptions + " missing exceptions : " + missingExceptions);
}
}
+
+ @Test
+ public void testValidateBeforeReserve() throws Exception {
+ nextConnection();
+ conn.createStatement().execute(
+ "CREATE TABLE foo (k VARCHAR PRIMARY KEY, l BIGINT)");
+ conn.createStatement().execute(
+ "CREATE SEQUENCE foo.bar");
+
+ nextConnection();
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT NEXT VALUE FOR foo.bar FROM foo");
+ assertTrue(rs.next());
+ conn.createStatement().execute(
+ "UPSERT INTO foo VALUES ('a', NEXT VALUE FOR foo.bar)");
+ conn.createStatement().execute(
+ "UPSERT INTO foo VALUES ('b', NEXT VALUE FOR foo.bar)");
+ conn.commit();
+
+ nextConnection();
+ rs = conn.createStatement().executeQuery("SELECT * FROM foo");
+ assertTrue(rs.next());
+ assertEquals("a",rs.getString(1));
+ assertEquals(1,rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals("b",rs.getString(1));
+ assertEquals(2,rs.getLong(2));
+ assertFalse(rs.next());
+
+ nextConnection();
+ PreparedStatement stmt = conn.prepareStatement("SELECT NEXT VALUE FOR foo.bar FROM foo");
+ ParameterMetaData md = stmt.getParameterMetaData();
+ assertEquals(0,md.getParameterCount());
+ rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(3, rs.getLong(1));
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e33c99d1/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index 9be45a4..03091c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.expression.BaseTerminalExpression;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.parse.SequenceValueParseNode.Op;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PDataType;
@@ -37,6 +38,7 @@ import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.tuple.DelegateTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SchemaUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -46,8 +48,8 @@ public class SequenceManager {
private int[] sequencePosition;
private List<SequenceKey> nextSequences;
private List<SequenceKey> currentSequences;
- private Map<SequenceKey,SequenceValueExpression> sequenceMap;
- private BitSet isNextSequence;
+ private final Map<SequenceKey,SequenceValueExpression> sequenceMap = Maps.newHashMap();
+ private final BitSet isNextSequence = new BitSet();
public SequenceManager(PhoenixStatement statement) {
this.statement = statement;
@@ -117,10 +119,6 @@ public class SequenceManager {
}
public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) {
- if (sequenceMap == null) {
- sequenceMap = Maps.newHashMap();
- isNextSequence = new BitSet();
- }
PName tenantName = statement.getConnection().getTenantId();
String tenantId = tenantName == null ? null : tenantName.getString();
TableName tableName = node.getTableName();
@@ -128,11 +126,13 @@ public class SequenceManager {
SequenceValueExpression expression = sequenceMap.get(key);
if (expression == null) {
int index = sequenceMap.size();
- expression = new SequenceValueExpression(index);
+ expression = new SequenceValueExpression(key, node.getOp(), index);
sequenceMap.put(key, expression);
+ } else if (expression.op != node.getOp()){
+ expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex());
}
// If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT
- if (node.getOp() == SequenceValueParseNode.Op.NEXT_VALUE) {
+ if (node.getOp() == Op.NEXT_VALUE) {
isNextSequence.set(expression.getIndex());
}
@@ -140,7 +140,7 @@ public class SequenceManager {
}
public void validateSequences(Sequence.ValueOp action) throws SQLException {
- if (sequenceMap == null || sequenceMap.isEmpty()) {
+ if (sequenceMap.isEmpty()) {
return;
}
int maxSize = sequenceMap.size();
@@ -174,9 +174,13 @@ public class SequenceManager {
}
private class SequenceValueExpression extends BaseTerminalExpression {
+ private final SequenceKey key;
+ private final Op op;
private final int index;
- private SequenceValueExpression(int index) {
+ private SequenceValueExpression(SequenceKey key, Op op, int index) {
+ this.key = key;
+ this.op = op;
this.index = index;
}
@@ -212,5 +216,9 @@ public class SequenceManager {
return true;
}
+ @Override
+ public String toString() {
+ return op.getName() + " VALUE FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e33c99d1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index dd8c3cb..ee8833b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -571,17 +571,14 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
// Get as of latest timestamp so we can detect if we have a newer table that already exists
// without making an additional query
- if (logger.isDebugEnabled()) logger.debug("Loading " + SchemaUtil.getTableName(schemaName, lockTableName) + " as of timestamp " + clientTimeStamp);
PTable table = loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
if (table != null) {
- if (logger.isDebugEnabled()) logger.debug("Found " + table.getName().getString() + " with timestamp of " + table.getTimeStamp());
if (table.getTimeStamp() < clientTimeStamp) {
// If the table is older than the client time stamp and it's deleted, continue
if (!isTableDeleted(table)) {
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
}
} else {
- if (logger.isDebugEnabled()) logger.debug("Returning NEWER_TABLE_FOUND result for " + table.getName().getString());
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e33c99d1/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java
new file mode 100644
index 0000000..87a0bc0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/DelegateExpression.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class DelegateExpression implements Expression {
+ private final Expression delegate;
+
+ public DelegateExpression(Expression delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return delegate.isNullable();
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return delegate.getDataType();
+ }
+
+ @Override
+ public Integer getMaxLength() {
+ return delegate.getMaxLength();
+ }
+
+ @Override
+ public Integer getScale() {
+ return delegate.getScale();
+ }
+
+ @Override
+ public SortOrder getSortOrder() {
+ return delegate.getSortOrder();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ delegate.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ delegate.write(output);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ return delegate.evaluate(tuple, ptr);
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ return delegate.accept(visitor);
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return delegate.getChildren();
+ }
+
+ @Override
+ public void reset() {
+ delegate.reset();
+ }
+
+ @Override
+ public boolean isStateless() {
+ return delegate.isStateless();
+ }
+
+ @Override
+ public Determinism getDeterminism() {
+ return delegate.getDeterminism();
+ }
+
+ @Override
+ public boolean requiresFinalEvaluation() {
+ return delegate.requiresFinalEvaluation();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e33c99d1/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
index 227d78b..f29d79e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java
@@ -22,7 +22,19 @@ import java.sql.SQLException;
public class SequenceValueParseNode extends TerminalParseNode {
- public enum Op {NEXT_VALUE, CURRENT_VALUE};
+ public enum Op {
+ NEXT_VALUE("NEXT"),
+ CURRENT_VALUE("CURRENT");
+
+ private final String name;
+ Op(String name) {
+ this.name = name;
+ }
+ public String getName() {
+ return name;
+ };
+
+ }
private final TableName tableName;
private final Op op;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e33c99d1/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f4d5699..7d7afa7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -1567,7 +1567,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
*/
@Override
public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 0, action);
+ incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, action);
}
/**
@@ -1583,10 +1583,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
*/
@Override
public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException {
- incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 1, Sequence.ValueOp.RESERVE_SEQUENCE);
+ incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
}
- private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, int factor, Sequence.ValueOp action) throws SQLException {
+ private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size());
for (SequenceKey key : keys) {
Sequence newSequences = new Sequence(key);
@@ -1607,11 +1607,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (int i = 0; i < sequences.size(); i++) {
Sequence sequence = sequences.get(i);
try {
- values[i] = sequence.incrementValue(timestamp, factor, action);
+ values[i] = sequence.incrementValue(timestamp, op);
} catch (EmptySequenceCacheException e) {
indexes[toIncrementList.size()] = i;
toIncrementList.add(sequence);
- Increment inc = sequence.newIncrement(timestamp, action);
+ Increment inc = sequence.newIncrement(timestamp, op);
incrementBatch.add(inc);
} catch (SQLException e) {
exceptions[i] = e;
@@ -1648,7 +1648,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Sequence sequence = toIncrementList.get(i);
Result result = (Result)resultObjects[i];
try {
- values[indexes[i]] = sequence.incrementValue(result, factor);
+ values[indexes[i]] = sequence.incrementValue(result, op);
} catch (SQLException e) {
exceptions[indexes[i]] = e;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e33c99d1/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index c9603e7..dc3d2b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -57,7 +57,7 @@ import com.google.common.math.LongMath;
public class Sequence {
public static final int SUCCESS = 0;
- public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE};
+ public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE, INCREMENT_SEQUENCE};
public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE};
// create empty Sequence key values used while created a sequence row
@@ -139,10 +139,10 @@ public class Sequence {
return value.isDeleted ? null : value;
}
- private long increment(SequenceValue value, int factor) throws SQLException {
- boolean increasingSeq = value.incrementBy > 0;
+ private long increment(SequenceValue value, ValueOp op) throws SQLException {
+ boolean increasingSeq = value.incrementBy > 0 && op != ValueOp.VALIDATE_SEQUENCE;
// check if the the sequence has already reached the min/max limit
- if (value.limitReached) {
+ if (value.limitReached && op != ValueOp.VALIDATE_SEQUENCE) {
if (value.cycle) {
value.limitReached=false;
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
@@ -156,12 +156,11 @@ public class Sequence {
}
long returnValue = value.currentValue;
- if (factor != 0) {
+ if (op == ValueOp.INCREMENT_SEQUENCE) {
boolean overflowOrUnderflow=false;
// advance currentValue while checking for overflow
try {
- long incrementValue = LongMath.checkedMultiply(value.incrementBy, factor);
- value.currentValue = LongMath.checkedAdd(value.currentValue, incrementValue);
+ value.currentValue = LongMath.checkedAdd(value.currentValue, value.incrementBy);
} catch (ArithmeticException e) {
overflowOrUnderflow = true;
}
@@ -176,18 +175,18 @@ public class Sequence {
return returnValue;
}
- public long incrementValue(long timestamp, int factor, ValueOp action) throws SQLException {
+ public long incrementValue(long timestamp, ValueOp op) throws SQLException {
SequenceValue value = findSequenceValue(timestamp);
if (value == null) {
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
}
if (value.currentValue == value.nextValue) {
- if (action == ValueOp.VALIDATE_SEQUENCE) {
+ if (op == ValueOp.VALIDATE_SEQUENCE) {
return value.currentValue;
}
throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
}
- return increment(value, factor);
+ return increment(value, op);
}
public List<Append> newReturns() {
@@ -245,7 +244,7 @@ public class Sequence {
return key;
}
- public long incrementValue(Result result, int factor) throws SQLException {
+ public long incrementValue(Result result, ValueOp op) throws SQLException {
// In this case, we don't definitely know the timestamp of the deleted sequence,
// but we know anything older is likely deleted. Worse case, we remove a sequence
// from the cache that we shouldn't have which will cause a gap in sequence values.
@@ -266,9 +265,9 @@ public class Sequence {
.build().buildException();
}
// If we found the sequence, we update our cache with the new value
- SequenceValue value = new SequenceValue(result);
+ SequenceValue value = new SequenceValue(result, op);
insertSequenceValue(value);
- return increment(value, factor);
+ return increment(value, op);
}
public Increment newIncrement(long timestamp, Sequence.ValueOp action) {
@@ -418,7 +417,7 @@ public class Sequence {
return this.incrementBy == 0;
}
- public SequenceValue(Result r) {
+ public SequenceValue(Result r, ValueOp op) {
KeyValue currentValueKV = getCurrentValueKV(r);
KeyValue incrementByKV = getIncrementByKV(r);
KeyValue cacheSizeKV = getCacheSizeKV(r);
@@ -433,7 +432,10 @@ public class Sequence {
this.maxValue = PDataType.LONG.getCodec().decodeLong(maxValueKV.getBuffer(), maxValueKV.getValueOffset(), SortOrder.getDefault());
this.cycle = (Boolean)PDataType.BOOLEAN.toObject(cycleKV.getBuffer(), cycleKV.getValueOffset(), cycleKV.getValueLength());
this.limitReached = false;
- currentValue = nextValue - incrementBy * cacheSize;
+ currentValue = nextValue;
+ if (op != ValueOp.VALIDATE_SEQUENCE) {
+ currentValue -= incrementBy * cacheSize;
+ }
}
}
[3/5] git commit: Backport timestamp fix for MetaDataEndPointImpl to
3.0
Posted by ja...@apache.org.
Backport timestamp fix for MetaDataEndPointImpl to 3.0
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/660b7d99
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/660b7d99
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/660b7d99
Branch: refs/heads/3.0
Commit: 660b7d99ca1d891bcba64c8fad0e30f7fbdc47b1
Parents: d68354d
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Oct 16 17:43:42 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Oct 16 20:58:17 2014 -0700
----------------------------------------------------------------------
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/660b7d99/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 74685d6..dd8c3cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -460,13 +460,14 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
schemaName.getString(), tableName.getString())) : physicalTables.get(0);
PTableStats stats = PTableStats.EMPTY_STATS;
if (tenantId == null) {
- HTableInterface statsHTable = getEnvironment().getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
+ HTableInterface statsHTable = null;
try {
+ statsHTable = getEnvironment().getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
logger.warn(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " not online yet?");
} finally {
- statsHTable.close();
+ if (statsHTable != null) statsHTable.close();
}
}
return PTableImpl
@@ -508,12 +509,12 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
return table.getName() == null;
}
- private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp) throws IOException, SQLException {
+ private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException {
HRegion region = env.getRegion();
Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
PTable table = metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
- if (table != null || (table = buildTable(key, cacheKey, region, clientTimeStamp)) != null) {
+ if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
return table;
}
// if not found then check if newer table already exists and add delete marker for timestamp found
@@ -557,7 +558,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
ImmutableBytesPtr parentCacheKey = null;
if (parentKey != null) {
parentCacheKey = new ImmutableBytesPtr(parentKey);
- parentTable = loadTable(env, parentKey, parentCacheKey, clientTimeStamp);
+ parentTable = loadTable(env, parentKey, parentCacheKey, clientTimeStamp, clientTimeStamp);
if (parentTable == null || isTableDeleted(parentTable)) {
return new MetaDataMutationResult(MutationCode.PARENT_TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), parentTable);
}
@@ -570,14 +571,17 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
// Get as of latest timestamp so we can detect if we have a newer table that already exists
// without making an additional query
- PTable table = loadTable(env, key, cacheKey, clientTimeStamp);
+ if (logger.isDebugEnabled()) logger.debug("Loading " + SchemaUtil.getTableName(schemaName, lockTableName) + " as of timestamp " + clientTimeStamp);
+ PTable table = loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
if (table != null) {
+ if (logger.isDebugEnabled()) logger.debug("Found " + table.getName().getString() + " with timestamp of " + table.getTimeStamp());
if (table.getTimeStamp() < clientTimeStamp) {
// If the table is older than the client time stamp and it's deleted, continue
if (!isTableDeleted(table)) {
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
}
} else {
+ if (logger.isDebugEnabled()) logger.debug("Returning NEWER_TABLE_FOUND result for " + table.getName().getString());
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
}
}
[2/5] git commit: PHOENIX-1214 SYSTEM.CATALOG cannot be created when
first connection to cluster is tenant-specific (Jan Van Besien)
Posted by ja...@apache.org.
PHOENIX-1214 SYSTEM.CATALOG cannot be created when first connection to cluster is tenant-specific (Jan Van Besien)
Conflicts:
phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9833f072
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9833f072
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9833f072
Branch: refs/heads/3.0
Commit: 9833f072285ac5983a17426821a7a4cc3a5c50fd
Parents: 660b7d9
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Oct 15 22:18:07 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Oct 16 20:58:17 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CSVCommonsLoaderIT.java | 4 ++-
.../query/ConnectionQueryServicesImpl.java | 4 ++-
.../query/ConnectionlessQueryServicesImpl.java | 4 ++-
.../java/org/apache/phoenix/util/JDBCUtil.java | 16 +++++++++
.../apache/phoenix/jdbc/PhoenixDriverTest.java | 31 ++++++++---------
.../org/apache/phoenix/util/JDBCUtilTest.java | 35 ++++++++++++++++++++
6 files changed, 76 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9833f072/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
index 9f36b93..f51f908 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
@@ -30,10 +30,12 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Properties;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.schema.PArrayDataType;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.util.CSVCommonsLoader;
@@ -156,7 +158,7 @@ public class CSVCommonsLoaderIT extends BaseHBaseManagedTimeIT {
new StringReader(statements), null);
globalConn.close();
- tenantConn = DriverManager.getConnection(getUrl() + ";TenantId=acme").unwrap(
+ tenantConn = new PhoenixTestDriver().connect(getUrl() + ";TenantId=acme", new Properties()).unwrap(
PhoenixConnection.class);
// Upsert CSV file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9833f072/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 03ecbc9..3a03401 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -107,6 +107,7 @@ import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixContextExecutor;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -1318,8 +1319,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
metaConnection = new PhoenixConnection(
- ConnectionQueryServicesImpl.this, url, scnProps, newEmptyMetaData());
+ ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData());
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9833f072/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index dd0cf54..3cd7917 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
@@ -208,7 +209,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
Properties scnProps = PropertiesUtil.deepCopy(props);
scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
- metaConnection = new PhoenixConnection(this, url, scnProps, newEmptyMetaData());
+ String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
+ metaConnection = new PhoenixConnection(this, globalUrl, scnProps, newEmptyMetaData());
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
} catch (TableAlreadyExistsException ignore) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9833f072/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 6148417..ff7ec20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -64,6 +64,22 @@ public class JDBCUtil {
return propValue;
}
+ public static String removeProperty(String url, String propName) {
+ String urlPropName = ";" + propName + "=";
+ int begIndex = url.indexOf(urlPropName);
+ if (begIndex >= 0) {
+ int endIndex = url.indexOf(';', begIndex + urlPropName.length());
+ if (endIndex < 0) {
+ endIndex = url.length();
+ }
+ String prefix = url.substring(0, begIndex);
+ String suffix = url.substring(endIndex, url.length());
+ return prefix + suffix;
+ } else {
+ return url;
+ }
+ }
+
public static Long getCurrentSCN(String url, Properties info) throws SQLException {
String scnStr = findProperty(url, info, PhoenixRuntime.CURRENT_SCN_ATTRIB);
return (scnStr == null ? null : Long.parseLong(scnStr));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9833f072/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
index bcb4f51..be40702 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
@@ -24,7 +24,6 @@ import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
-import java.sql.SQLException;
import java.util.Properties;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
@@ -37,10 +36,23 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
@Test
public void testFirstConnectionWhenPropsHasTenantId() throws Exception {
- final String url = getUrl();
- verifyConnectionValid(url);
+ Properties props = new Properties();
+ final String tenantId = "00Dxx0000001234";
+ props.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+
+ Connection connection = new PhoenixTestDriver().connect(getUrl(), props);
+ assertEquals(tenantId, connection.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB));
}
-
+
+ @Test
+ public void testFirstConnectionWhenUrlHasTenantId() throws Exception {
+ final String tenantId = "00Dxx0000001234";
+ String url = getUrl() + ";" + PhoenixRuntime.TENANT_ID_ATTRIB + "=" + tenantId;
+ Driver driver = new PhoenixTestDriver();
+
+ driver.connect(url, new Properties());
+ }
+
@Test
public void testMaxMutationSizeSetCorrectly() throws Exception {
Properties connectionProperties = new Properties();
@@ -59,15 +71,4 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
fail("Upsert should have failed since the number of upserts (200) is greater than the MAX_MUTATION_SIZE_ATTRIB (100)");
} catch (IllegalArgumentException expected) {}
}
-
- private void verifyConnectionValid(String url) throws SQLException {
- Driver driver = DriverManager.getDriver(url);
-
- Properties props = new Properties();
- final String tenantId = "00Dxx0000001234";
- props.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-
- Connection connection = driver.connect(url, props);
- assertEquals(tenantId, connection.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB));
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9833f072/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
new file mode 100644
index 0000000..28e830b
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class JDBCUtilTest {
+
+ @Test
+ public void testRemoveProperty() {
+ assertEquals("localhost;", JDBCUtil.removeProperty("localhost;TenantId=abc;", TENANT_ID_ATTRIB));
+ assertEquals("localhost;foo=bar", JDBCUtil.removeProperty("localhost;TenantId=abc;foo=bar", TENANT_ID_ATTRIB));
+ assertEquals("localhost;TenantId=abc", JDBCUtil.removeProperty("localhost;TenantId=abc;foo=bar", "foo"));
+ assertEquals("localhost;TenantId=abc;foo=bar", JDBCUtil.removeProperty("localhost;TenantId=abc;foo=bar", "bar"));
+ }
+}
[5/5] git commit: Merge branch '3.0' of
https://git-wip-us.apache.org/repos/asf/phoenix into 3.0
Posted by ja...@apache.org.
Merge branch '3.0' of https://git-wip-us.apache.org/repos/asf/phoenix into 3.0
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3f254dc4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3f254dc4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3f254dc4
Branch: refs/heads/3.0
Commit: 3f254dc4d46ec3615d30a57c24b5ccc4a5dbe745
Parents: e33c99d 81c300c
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Oct 17 00:53:54 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Oct 17 00:53:54 2014 -0700
----------------------------------------------------------------------
.../ConnectionQueryServicesTestImpl.java | 2 +-
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 2 +-
.../phoenix/mapreduce/CsvBulkLoadTool.java | 7 ++++++-
.../phoenix/mapreduce/CsvToKeyValueMapper.java | 21 +++++++++++++++++++-
.../query/ConnectionQueryServicesImpl.java | 9 ++++++++-
5 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3f254dc4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------